# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
import numpy as np
from pymoo.core.problem import ElementwiseProblem
from pymoo.core.population import Population
from pymoo.util.ref_dirs import get_reference_directions
from pymoo.optimize import minimize
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.repair.rounding import RoundingRepair
from pymoo.core.mixed import MixedVariableSampling
from pymoo.core.sampling import Sampling
from pymoo.operators.sampling.rnd import IntegerRandomSampling
from pymoo.core.termination import Termination
from pymoo.core.mutation import Mutation
from VeraGridEngine.basic_structures import Vec, IntVec, IntMat, Mat
[docs]
def finalize_seed_population(seed_population: IntMat,
seed_objectives: Mat | None,
lb: Vec | IntVec,
ub: Vec | IntVec) -> tuple[IntMat, Mat | None]:
"""
Clip, round and deduplicate a user-provided seed population.
:param seed_population: Candidate initial population.
:type seed_population: IntMat
:param seed_objectives: Optional objective matrix aligned with ``seed_population``.
:type seed_objectives: Mat | None
:param lb: Lower decision-vector bounds.
:type lb: Vec | IntVec
:param ub: Upper decision-vector bounds.
:type ub: Vec | IntVec
:return: Clean seed population and aligned objective matrix.
:rtype: tuple[IntMat, Mat | None]
"""
lb_arr: IntVec = np.asarray(lb, dtype=int)
ub_arr: IntVec = np.asarray(ub, dtype=int)
clipped_population: IntMat = np.asarray(np.rint(seed_population), dtype=int)
clipped_population = np.atleast_2d(clipped_population)
clipped_population = np.clip(clipped_population, lb_arr, ub_arr)
if clipped_population.shape[1] == len(lb_arr):
pass
else:
raise ValueError("The seed population does not match the problem dimension")
objective_matrix: Mat | None
if seed_objectives is None:
objective_matrix = None
else:
objective_matrix = np.asarray(seed_objectives, dtype=float)
objective_matrix = np.atleast_2d(objective_matrix)
if len(objective_matrix) == len(clipped_population):
pass
else:
raise ValueError("The seed population and objective arrays must have the same row count")
# Preserve the first occurrence of each point so the caller controls the seed priority order.
unique_rows: list[IntVec] = list()
unique_objective_rows: list[Vec] = list()
row_idx: int
for row_idx in range(len(clipped_population)):
current_row: IntVec = clipped_population[row_idx, :]
is_duplicate: bool = False
existing_row: IntVec
for existing_row in unique_rows:
if np.array_equal(existing_row, current_row):
is_duplicate = True
else:
pass
if is_duplicate:
pass
else:
unique_rows.append(current_row.copy())
if objective_matrix is None:
pass
else:
unique_objective_rows.append(objective_matrix[row_idx, :].copy())
if len(unique_rows) > 0:
unique_population: IntMat = np.asarray(unique_rows, dtype=int)
else:
unique_population = np.zeros((0, len(lb_arr)), dtype=int)
if objective_matrix is None:
unique_objectives: Mat | None = None
elif len(unique_objective_rows) > 0:
unique_objectives = np.asarray(unique_objective_rows, dtype=float)
else:
unique_objectives = np.zeros((0, 0), dtype=float)
return unique_population, unique_objectives
[docs]
def get_nsga3_initial_population(problem: ElementwiseProblem,
pop_size: int,
initial_population: IntMat,
initial_objectives: Mat | None) -> Population:
"""
Build an optional pymoo initial population from plain NumPy arrays.
:param problem: pymoo problem instance.
:type problem: ElementwiseProblem
:param pop_size: Requested NSGA3 population size.
:type pop_size: int
:param initial_population: Seed decision vectors.
:type initial_population: IntMat
:param initial_objectives: Optional objective vectors aligned with ``initial_population``.
:type initial_objectives: Mat | None
:return: pymoo population object with optional pre-evaluated individuals.
:rtype: Population
"""
fallback_sampling: Sampling = SkewedIntegerSamplingRange()
n_seed: int = min(len(initial_population), pop_size)
samples: IntMat = np.zeros((pop_size, int(problem.n_var)), dtype=int)
# Copy the externally-provided combinations first so NSGA3 starts from the intended warm-start population.
if n_seed > 0:
samples[:n_seed, :] = initial_population[:n_seed, :]
else:
pass
remaining_samples: int = pop_size - n_seed
# Fill the remaining rows with the original sampler so the warm-start stays optional and non-invasive.
if remaining_samples > 0:
fallback_population: IntMat = fallback_sampling._do(problem=problem, n_samples=remaining_samples)
samples[n_seed:, :] = np.asarray(fallback_population, dtype=int)
else:
pass
population: Population = Population.new(X=samples)
# Mark the provided objective values as already evaluated so pymoo does not call the objective function again.
if initial_objectives is None:
pass
else:
population[:n_seed].set("F", initial_objectives[:n_seed, :])
for row_idx in range(n_seed):
population[row_idx].evaluated.update({"F", "G", "H"})
return population
[docs]
class IntegerRandomSamplingVeraGrid(Sampling):
def _do(self, problem, n_samples, **kwargs):
xl = np.asarray(problem.xl, dtype=int)
xu = np.asarray(problem.xu, dtype=int)
n_var = len(xl)
X = np.zeros((n_samples, n_var), dtype=int)
for j in range(n_var):
values = np.arange(xl[j], xu[j] + 1)
for i in range(n_samples):
X[i, j] = np.random.choice(values)
return X
[docs]
class SkewedBinarySampling(Sampling):
"""
SkewedBinarySampling
"""
def _do(self, problem, n_samples, **kwargs):
max_ones = int(problem.n_var * 1)
num_ones = (np.linspace(0, 1, n_samples) ** 3 * max_ones).astype(int)
num_ones[-1] = max_ones
ones_into_array = np.zeros((n_samples, problem.n_var), dtype=int)
# Fill ones_into_array randomly
for i, num in enumerate(num_ones):
ones_into_array[i, :num] = 1
np.random.shuffle(ones_into_array[i])
# # Add rows with only one '1'
# additional_rows = 100
# for _ in range(additional_rows):
# row = np.zeros(problem.n_var, dtype=int)
# row[np.random.randint(0, problem.n_var)] = 1
# ones_into_array = np.vstack([ones_into_array, row])
return ones_into_array
[docs]
class SkewedIntegerSamplingRange(Sampling):
"""
SkewedIntegerSampling generates samples skewed toward the lower bounds
but spread across the full lbβub range. Works for integer variables.
"""
def _do(self, problem, n_samples, **kwargs):
xl = np.asarray(problem.xl, dtype=int)
xu = np.asarray(problem.xu, dtype=int)
n_var = len(xl)
X = np.zeros((n_samples, n_var), dtype=int)
# Generate skewed samples per variable
for j in range(n_var):
# Create skewed samples in [0, 1]
skewed = (np.linspace(0, 1, n_samples) ** 3)
# Scale to range [xl[j], xu[j]]
range_j = xu[j] - xl[j]
values = (skewed * range_j + xl[j]).astype(int)
# Shuffle for diversity
np.random.shuffle(values)
X[:, j] = values
return X
[docs]
class QuadBinarySampling(Sampling):
"""
QuadBinarySampling
"""
def _do(self, problem, n_samples, **kwargs):
max_ones = int(problem.n_var * 1)
half_samples = n_samples // 2
# Adjust the num_ones calculation to create a distribution that is quadratic in the first half
num_ones = (np.linspace(0, 1, half_samples) * max_ones).astype(int)
# Fill the rest of the array with 0s quadratically
num_zeros = (np.linspace(1, 0, n_samples - half_samples) ** 3 * max_ones).astype(int)
num_ones = np.concatenate([num_ones, num_zeros])
ones_into_array = np.zeros((n_samples, problem.n_var), dtype=int)
# Fill ones_into_array randomly
for i, num in enumerate(num_ones):
ones_into_array[i, :num] = 1
np.random.shuffle(ones_into_array[i])
return ones_into_array
[docs]
class BitflipMutation(Mutation):
"""
BitflipMutation
"""
def _do(self, problem, x, **kwargs):
mask = np.random.random(x.shape) < self.get_prob_var(problem)
x[mask] = 1 - x[mask]
return x
[docs]
class GridNsga(ElementwiseProblem):
"""
Problem formulation packaging to use the pymoo library
"""
def __init__(self, obj_func, n_var, n_obj, lb: Vec | IntVec, ub: Vec | IntVec):
"""
:param obj_func:
:param n_var:
:param n_obj:
"""
super().__init__(n_var=n_var,
n_obj=n_obj,
n_ieq_constr=0,
xl=lb,
xu=ub,
vtype=int)
self.obj_func = obj_func
def _evaluate(self, x, out, *args, **kwargs):
"""
:param x:
:param out:
:param args:
:param kwargs:
:return:
"""
out["F"] = self.obj_func(x)
[docs]
def NSGA_3(obj_func,
n_var: int, lb: Vec | IntVec, ub: Vec | IntVec,
n_obj: int,
n_partitions: int = 100,
max_evals: int = 30,
pop_size: int = 1,
crossover_prob: float = 0.05,
mutation_probability=0.5,
eta: float = 3.0,
initial_population: IntMat | None = None,
initial_objectives: Mat | None = None):
"""
NSGA3 designed for pareto investments
:param obj_func: Objective function pointer [f(x)]
:param n_partitions: Number of partitions
:param n_var: Number of variables
:param lb: Array of x lower boundaries
:param ub: Array of x upper boundaries
:param n_obj: Number of objectives
:param max_evals: Maximum number of evaluations
:param pop_size: Population size
:param crossover_prob: Crossover probability
:param mutation_probability: Mutation probability
:param eta: eta parameter for the SBX crossover
:param initial_population: Optional integer population used to seed the first NSGA3 generation.
:type initial_population: IntMat | None
:param initial_objectives: Optional objective vectors aligned with ``initial_population``.
:type initial_objectives: Mat | None
:return: X, f
"""
problem = GridNsga(obj_func, n_var, n_obj, lb=lb, ub=ub)
ref_dirs = get_reference_directions("das-dennis", n_obj, n_partitions=n_partitions) # ref_dirs = get_reference_directions("reduction", n_obj, n_partitions, seed=1)
sampling: Sampling | Population
# Keep warm-starting entirely optional. When nothing is passed, NSGA3 behaves exactly as before.
if initial_population is None:
sampling = SkewedIntegerSamplingRange()
else:
seed_population: IntMat
seed_objectives: Mat | None
seed_population, seed_objectives = finalize_seed_population(seed_population=initial_population,
seed_objectives=initial_objectives,
lb=lb,
ub=ub)
sampling = get_nsga3_initial_population(problem=problem,
pop_size=pop_size,
initial_population=seed_population,
initial_objectives=seed_objectives)
algorithm = NSGA3(pop_size=pop_size,
sampling=sampling,
# IntegerRandomSamplingVeraGrid(), # SkewedBinarySampling(),
crossover=SBX(prob=crossover_prob,
eta=eta,
vtype=float,
repair=RoundingRepair()),
mutation=BitflipMutation(prob=mutation_probability,
prob_var=0.4,
repair=RoundingRepair()),
eliminate_duplicates=True,
ref_dirs=ref_dirs)
# term = Termination()
res = minimize(problem=problem,
algorithm=algorithm,
termination=('n_eval', max_evals),
seed=1,
verbose=True,
save_history=False)
return res.X, res.F