Source code for VeraGridEngine.Simulations.InvestmentsEvaluation.investments_evaluation_driver

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.  
# SPDX-License-Identifier: MPL-2.0

import timeit
from math import comb

import numpy as np

from VeraGridEngine.Compilers.circuit_to_gslv import GSLV_AVAILABLE, gslv_investments_evaluation
from VeraGridEngine.Simulations.driver_template import DriverTemplate
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Utils.NumericalMethods.MVRSM_mo_pareto import MVRSM_mo_pareto
from VeraGridEngine.Simulations.InvestmentsEvaluation.investments_evaluation_results import InvestmentsEvaluationResults
from VeraGridEngine.Simulations.InvestmentsEvaluation.investments_evaluation_options import InvestmentsEvaluationOptions
from VeraGridEngine.Simulations.InvestmentsEvaluation.Methods.NSGA_3 import NSGA_3
from VeraGridEngine.Simulations.InvestmentsEvaluation.Methods.mixed_variable_NSGA_2 import NSGA_2
from VeraGridEngine.Simulations.InvestmentsEvaluation.Methods.random_eval import random_trial
from VeraGridEngine.Simulations.InvestmentsEvaluation.Methods.toot_pint_cba import (
    TOOT_PINT_CBA,
    get_toot_pint_seed_population,
)
from VeraGridEngine.Simulations.InvestmentsEvaluation.Problems.black_box_problem_template import BlackBoxProblemTemplate
from VeraGridEngine.enumerations import EngineType, InvestmentEvaluationMethod, InvestmentsEvaluationObjectives, SimulationTypes
from VeraGridEngine.basic_structures import IntVec, IntMat, Vec, Mat


[docs] class InvestmentsEvaluationDriver(DriverTemplate): __slots__ = ( "options", "problem", ) name = 'Investments evaluation' tpe = SimulationTypes.InvestmentsEvaluation_run def __init__(self, grid: MultiCircuit, options: InvestmentsEvaluationOptions, problem: BlackBoxProblemTemplate, engine: EngineType = EngineType.VeraGrid): """ InputsAnalysisDriver class constructor :param grid: MultiCircuit instance :param options: InvestmentsEvaluationOptions :param problem: BlackBoxProblemTemplate :param engine: EngineType """ super().__init__(grid=grid, engine=engine) # options object self.options = options # problem definition self.problem: BlackBoxProblemTemplate = problem # results object if problem is not None: self.results = InvestmentsEvaluationResults( f_names=self.problem.get_objectives_names(), x_names=self.problem.get_vars_names(), plot_x_idx=self.problem.plot_x_idx, plot_y_idx=self.problem.plot_y_idx, max_eval=self.options.max_eval ) else: self.results = InvestmentsEvaluationResults( f_names=np.array([]), x_names=np.array([]), plot_x_idx=0, plot_y_idx=0, max_eval=self.options.max_eval )
[docs] def initialize(self, max_iter: int): """ Initialize the results :param max_iter: Maximum iterations """ self.results = InvestmentsEvaluationResults( f_names=self.problem.get_objectives_names(), x_names=self.problem.get_vars_names(), plot_x_idx=self.problem.plot_x_idx, plot_y_idx=self.problem.plot_y_idx, max_eval=max_iter )
[docs] def get_steps(self): """ :return: """ return self.results.get_index()
[docs] def objective_function(self, x: IntVec, record_results: bool = True) -> Vec: """ Function to evaluate a combination of investments :param x: vector of investments (yes/no). Length = number of investment groups :param record_results: record the results or not :return: multi-objective function criteria values """ objectives = self.problem.objective_function(x) if record_results: self.results.add(x_vec=x, f_vec=objectives) self.report_progress2(self.results.current_evaluation, self.results.max_eval) else: pass return objectives
[docs] def objective_function_so(self, x: IntVec) -> float: """ Single objective version of the objective function :param x: vector of investments (yes/no). Length = number of investment groups :return: summation of the objectives """ res_vec = self.objective_function(x=x) return res_vec.sum()
[docs] def evaluate_individual_investments(self): """ Run a one-by-one investment evaluation without considering multiple evaluation groups at a time """ results_with_combinations = list() dim = len(self.grid.investments_groups) self.objective_function(x=np.zeros(self.problem.n_vars(), dtype=int)) baseline = self.objective_function(x=np.zeros(dim, dtype=int)) results_with_combinations.append((baseline, np.zeros(dim, dtype=int))) st = timeit.default_timer() for k in range(dim): self.report_text(f"Evaluating investment group {k}...") combination = np.zeros(dim, dtype=int) combination[k] = 1 results = self.objective_function(x=combination, record_results=False) results_with_combinations.append((results, combination)) et = timeit.default_timer() print(f"Time taken to evaluate individual investments: {et - st}") return results_with_combinations
[docs] def independent_evaluation(self) -> None: """ Evaluate projects with a CBA-like reference/PINT/TOOT ranking and then build a cumulative portfolio. """ self.report_text("Evaluating investments with a CBA-like independent sequence...") dim: int = self.problem.n_vars() n_cumulative_evaluations: int = dim - 1 if dim > 0 else 0 # The displayed metrics must come from the active problem, so the results object is initialized with the # problem objective names and every stored evaluation comes from self.objective_function(). self.initialize(max_iter=1 + dim + n_cumulative_evaluations) best_combination: IntVec = TOOT_PINT_CBA( obj_func=self.objective_function, n_var=dim, lb=self.problem.x_min, ub=self.problem.x_max, n_obj=len(self.problem.get_objectives_names()), objective_names=self.problem.get_objectives_names(), variable_names=self.problem.get_vars_names(), report_text=self.report_text, logger=self.logger, ) self.results.set_best_combination(combination=best_combination)
[docs] def optimized_evaluation_mvrsm_pareto(self) -> None: """ Run an optimized investment evaluation without considering multiple evaluation groups at a time """ self.report_text("Evaluating investments with multi-objective MVRSM...") # number of random evaluations at the beginning dim = self.problem.n_vars() rand_evals = round(dim) lb = self.problem.x_min ub = self.problem.x_max rand_search_active_prob = 0.5 x0 = np.random.binomial(1, rand_search_active_prob, dim) # compile the snapshot self.initialize(max_iter=self.options.max_eval * 2) # add baseline ret = self.objective_function(x=np.zeros(self.problem.n_vars(), dtype=int)) sorted_y_, sorted_x_, y_population_, x_population_ = MVRSM_mo_pareto( obj_func=self.objective_function, x0=x0, lb=lb, ub=ub, num_int=dim, max_evals=self.options.max_eval, n_objectives=len(ret), rand_evals=rand_evals, args=() ) self.results.set_best_combination(combination=sorted_x_[0, :])
[docs] def optimized_evaluation_nsga3(self) -> None: """ Run an optimized investment evaluation with NSGA3 """ self.report_text("Evaluating investments with NSGA3...") dim = self.problem.n_vars() pop_size = int(round(dim)) # for the ieee 118 bus grid make this * 3 # compile the snapshot (+1 for the baseline evaluation) self.initialize(max_iter=self.options.max_eval + 1) # add baseline ret = self.objective_function(x=np.zeros(self.problem.n_vars(), dtype=int)) # find the largest n_partitions such that ref_dirs <= pop_size # ref_dirs = C(n_partitions + n_obj - 1, n_obj - 1) with das-dennis n_obj = len(ret) n_partitions = 1 while comb(n_partitions + 1 + n_obj - 1, n_obj - 1) <= pop_size: n_partitions += 1 # optimize X, obj_values = NSGA_3( obj_func=self.objective_function, n_partitions=n_partitions, n_var=dim, lb=self.problem.x_min, ub=self.problem.x_max, n_obj=len(ret), max_evals=self.options.max_eval, # termination pop_size=pop_size, crossover_prob=0.8, mutation_probability=0.1, eta=30, ) self.results.set_best_combination(combination=X[:, 0])
[docs] def optimized_evaluation_pint_toot_nsga3(self) -> None: """ Run an NSGA3 search warm-started with the direct PINT and TOOT evaluations. """ self.report_text("Evaluating investments with PINT/TOOT-seeded NSGA3...") dim: int = self.problem.n_vars() pop_size: int = int(round(dim)) n_obj: int = len(self.problem.get_objectives_names()) # Record the evaluated warm-start points first and then leave the remaining budget to NSGA3 itself. self.initialize(max_iter=self.options.max_eval + 2 * dim + 2) seed_population: IntMat seed_objectives: Mat seed_population, seed_objectives = get_toot_pint_seed_population( obj_func=self.objective_function, n_var=dim, lb=self.problem.x_min, ub=self.problem.x_max, n_obj=n_obj, objective_names=self.problem.get_objectives_names(), variable_names=self.problem.get_vars_names(), pop_size=pop_size, report_text=self.report_text, logger=self.logger, record_results=True, ) # Find the largest number of reference partitions compatible with the selected population size. n_partitions: int = 1 while comb(n_partitions + 1 + n_obj - 1, n_obj - 1) <= pop_size: n_partitions += 1 X, obj_values = NSGA_3( obj_func=self.objective_function, n_partitions=n_partitions, n_var=dim, lb=self.problem.x_min, ub=self.problem.x_max, n_obj=n_obj, max_evals=self.options.max_eval, pop_size=pop_size, crossover_prob=0.8, mutation_probability=0.1, eta=30, initial_population=seed_population, initial_objectives=seed_objectives, ) self.results.set_best_combination(combination=X[:, 0])
[docs] def randomized_evaluation(self) -> None: """ Run purely random evaluations, without any optimization """ self.report_text("Randomly evaluating investments...") # compile the snapshot self.initialize(max_iter=self.options.max_eval) # add baseline ret = self.objective_function(x=np.zeros(self.problem.n_vars(), dtype=int)) # optimize dim = self.problem.n_vars() X, obj_values = random_trial( obj_func=self.objective_function, n_var=dim, lb=self.problem.x_min, ub=self.problem.x_max, n_obj=len(ret), max_evals=self.options.max_eval, ) self.results.set_best_combination(combination=X[:, 0])
[docs] def optimized_evaluation_mixed_nsga2(self) -> None: """ Run an optimized investment evaluation on mixed variables with NSGA2 """ self.report_text("Evaluating investments with NSGA2...") dim = self.problem.n_vars() pop_size = int(round(dim)) * 2 # compile the snapshot self.initialize(max_iter=self.options.max_eval * 2) # add baseline ret = self.objective_function(x=np.zeros(self.problem.n_vars(), dtype=int)) # optimize X, obj_values = NSGA_2( grid=self.grid, obj_func=self.objective_function, n_obj=len(ret), max_evals=self.options.max_eval, # termination pop_size=pop_size, # crossover_prob=0.8, # mutation_probability=0.1, # eta=30, ) res_x = [] for i, v in enumerate(X): if isinstance(v, dict): vall = list(v.values())[0] res_x.append(vall) else: res_x.append(v) self.results.set_best_combination(combination=np.array(res_x))
[docs] def run(self) -> None: """ run the QThread """ self.tic() self.logger.add_info(msg="Solver", value=f"{self.options.solver.value}") self.logger.add_info(msg="Max evaluations", value=f"{self.options.max_eval}") if self.engine == EngineType.GSLV and not GSLV_AVAILABLE: self.engine = EngineType.VeraGrid self.logger.add_warning('GSLV not available, falling back to VeraGrid') else: pass if self.engine == EngineType.GSLV: if self.options.objf_tpe not in ( InvestmentsEvaluationObjectives.PowerFlow, InvestmentsEvaluationObjectives.TimeSeriesPowerFlow, InvestmentsEvaluationObjectives.LinearOptimalPowerFlowTimeSeries): self.engine = EngineType.VeraGrid self.logger.add_warning( 'GSLV investments evaluation does not support this objective, falling back to VeraGrid' ) else: pass else: pass if self.engine == EngineType.GSLV and self.options.solver == InvestmentEvaluationMethod.FromPlugin: self.engine = EngineType.VeraGrid self.logger.add_warning( 'GSLV investments evaluation does not support VeraGrid plugin solvers, falling back to VeraGrid' ) else: pass if self.engine == EngineType.GSLV: self.report_text("Evaluating investments with GSLV...") self.results = gslv_investments_evaluation( circuit=self.grid, options=self.options, logger=self.logger, ) elif self.options.solver == InvestmentEvaluationMethod.CBA_PINT_TOOT: self.independent_evaluation() elif self.options.solver == InvestmentEvaluationMethod.MVRSM: self.optimized_evaluation_mvrsm_pareto() elif self.options.solver == InvestmentEvaluationMethod.NSGA3: self.optimized_evaluation_nsga3() elif self.options.solver == InvestmentEvaluationMethod.PINT_TOOT_NSGA3: self.optimized_evaluation_pint_toot_nsga3() elif self.options.solver == InvestmentEvaluationMethod.Random: self.randomized_evaluation() elif self.options.solver == InvestmentEvaluationMethod.MixedVariableGA: self.optimized_evaluation_mixed_nsga2() elif self.options.solver == InvestmentEvaluationMethod.FromPlugin: self.options.plugin_fcn_ptr(self) else: raise Exception('Unsupported method') # finalize self.report_text("Finalizing the results object...") if self.engine != EngineType.GSLV: self.results.finalize() else: pass # report the combination if self.problem is not None: inv_list = self.problem.get_investments_for_combination(x=self.results.f_best) else: inv_list = list() for inv in inv_list: self.logger.add_info(msg=f"Best combination", device=inv.idtag, value=inv.name) self.toc() self.report_done()
[docs] def cancel(self): self.__cancel__ = True self.report_done("Cancelled!")