# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from typing import List, Dict, Union
from pymoo.core.mixed import MixedVariableGA
from pymoo.algorithms.moo.nsga2 import RankAndCrowding
# from pymoo.decomposition.asf import ASF
# import matplotlib.pyplot as plt # this is going to be in results, here for now to show we need to include plots
from pymoo.core.mixed import MixedVariableSampling
from pymoo.optimize import minimize
from pymoo.core.problem import ElementwiseProblem
from pymoo.core.variable import Real, Integer, Choice, Binary
from VeraGridEngine.Devices.Aggregation.investment import Investment
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Devices.Branches.transformer import Transformer2W
from VeraGridEngine.Devices.Branches.line import Line
from VeraGridEngine.Devices.types import BRANCH_TYPES, BRANCH_TEMPLATE_TYPES
from VeraGridEngine.enumerations import DeviceType
from VeraGridEngine.basic_structures import Logger
[docs]
class MixedVariableProblem(ElementwiseProblem):
"""
Problem formulation packaging to use the pymoo library
"""
def __init__(self, grid: MultiCircuit, obj_func, n_obj):
"""
:param obj_func:
:param n_obj:
"""
ElementwiseProblem.__init__(self, n_var=n_obj, n_obj=n_obj)
self.logger = Logger()
self.grid = grid
all_dict, dict_ok = self.grid.get_all_elements_dict()
self.device_template_dict: Dict[BRANCH_TYPES, List[BRANCH_TEMPLATE_TYPES]] = dict()
# create the decision vars
for investment_group, investments_list in self.grid.get_investments_by_groups():
if len(investments_list) == 1:
for investment in investments_list:
device = all_dict.get(investment.device_idtag, None)
if device is not None:
if isinstance(device, Transformer2W):
for ass_key, association in device.possible_transformer_types.data.items():
template = association.api_object
lst = self.device_template_dict.get(device, None)
if lst is None:
self.device_template_dict[device] = [template]
else:
lst.extend([template])
elif isinstance(device, Line):
for association_type in [device.possible_tower_types,
device.possible_sequence_line_types,
device.possible_underground_line_types]:
for ass_key, association in association_type.data.items():
template = association.api_object
lst = self.device_template_dict.get(device, None)
if lst is None:
self.device_template_dict[device] = [template]
else:
lst.extend([template])
else:
self.logger.add_error("Investment device not recognized",
device=device.name,
device_class=device.device_type)
else:
self.logger.add_error("Investment device is none",
device=investment.device_idtag)
else:
self.logger.add_error("Only single-investment groups can be considered",
device=investment_group.name,
device_class=investment_group.device_type.value)
# convert the data to decision vars: the decision vars are
# integers from 0 to the number of templates of each device (the template position in self.data[device])
self.variables: Dict[str, Integer] = dict()
self.devices = list() # list of devices in sequential order to match the order of the vars
self.default_template = list() # list of templates that represent the devices in their initial state
for elm, template_list in self.device_template_dict.items():
self.variables[elm.idtag] = Integer(bounds=(0, len(template_list) + 1))
self.devices.append(elm)
if isinstance(elm, Line):
default_template = elm.get_line_type()
elif isinstance(elm, Transformer2W):
default_template = elm.get_transformer_type(Sbase=self.grid.Sbase)
else:
raise Exception('Device not recognized')
self.default_template.append(default_template)
super().__init__(n_obj=n_obj, vars=self.variables)
self.obj_func = obj_func
def _evaluate(self, x, out, *args, **kwargs):
"""
:param x:
:param out:
:param args:
:param kwargs:
:return:
"""
for i, xi in enumerate(x):
device = self.devices[i]
if i > 0:
template = self.data[device.idtag][xi]
if isinstance(device, Line):
device.apply_template(obj=template, Sbase=self.grid.Sbase,
freq=self.grid.fBase, logger=self.logger)
elif isinstance(device, Transformer2W):
device.apply_template(template, Sbase=self.grid.Sbase, logger=self.logger)
else:
raise Exception('Device not recognized')
else:
device.apply_template(obj=self.default_template[i], Sbase=self.grid.Sbase,
freq=self.grid.fBase, logger=self.logger)
out["F"] = self.obj_func(x)
print("Completed eval")
[docs]
def NSGA_2(grid: MultiCircuit,
obj_func,
n_obj: int = 2,
max_evals: int = 30,
pop_size: int = 1,
# crossover_prob: float = 0.05,
# mutation_probability=0.5,
# eta: float = 3.0
):
"""
:param obj_func:
:param n_obj:
:param max_evals:
:param pop_size:
# :param crossover_prob:
# :param mutation_probability:
# :param eta:
:return:
"""
problem = MixedVariableProblem(grid, obj_func, n_obj)
algorithm = MixedVariableGA(pop_size=pop_size,
sampling=MixedVariableSampling(),
survival=RankAndCrowding(crowding_func="pcd"))
# In terms of setting probability parameters, you have to look quite far deep into MixedVariableGA
res = minimize(problem=problem,
algorithm=algorithm,
termination=('n_eval', max_evals),
seed=1,
verbose=True,
save_history=False)
# Do they want opex or capex to have more weight?
# weights = np.array([0.5, 0.5])
# decomp = ASF()
# I = decomp(res.F, weights).argmin()
import pandas as pd
dff = pd.DataFrame(res.F)
dff.to_excel('nsga.xlsx')
return res.X, res.F