Source code for VeraGridEngine.Simulations.InvestmentsEvaluation.Problems.adequacy_problem

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations

import numpy as np
import numba as nb
from scipy.sparse import lil_matrix
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.basic_structures import Vec, IntVec, StrVec, IntMat
from VeraGridEngine.Simulations.InvestmentsEvaluation.Problems.black_box_problem_template import BlackBoxProblemTemplate
from VeraGridEngine.Simulations.Reliability.reliability import reliability_simulation
from VeraGridEngine.Simulations.OPF.simple_dispatch_ts import GreedyDispatchInputs, greedy_dispatch2


[docs] @nb.njit(cache=True) def correct_x(x, lb, ub): """ Correct x in place to the given boundaries :param x: :param lb: :param ub: :return: """ for i in range(len(x)): if x[i] < lb[i]: x[i] = lb[i] elif x[i] > ub[i]: x[i] = ub[i]
[docs] @nb.njit(cache=True) def apply_actives_mask(original_active: IntMat, mask_indices: IntVec, mask: IntVec, years_starts_indices: IntVec): """ :param original_active: :param mask_indices: :param mask: x applied to generators or batteries (goes from 0 to N-years + 1) :param years_starts_indices: array saying in which profile index starts each year :return: """ active = original_active.copy() for i in mask_indices: if mask[i] == 0: active[:, i] = 0 else: start = years_starts_indices[int(mask[i]) - 1] active[:start, i] = 0 # deactivate until start active[start:, i] = 1 # activate from start onwards return active
[docs] def determine_starting_index_of_every_year(index) -> IntVec: """ Find the index where each different year starts :param index: :return: """ indices = list() year_prev = index[0].year indices.append(0) for i, entry in enumerate(index): if entry.year != year_prev: year_prev = entry.year indices.append(i) return np.array(indices)
[docs] class AdequacyInvestmentProblem(BlackBoxProblemTemplate): def __init__(self, grid: MultiCircuit, n_monte_carlo_sim=10000, use_monte_carlo: bool = True, minimum_firm_share: float = 0.2, use_firm_capacity_penalty: bool = True, save_file: bool = True, time_indices: IntVec | None = None): """ :param grid: :param n_monte_carlo_sim: :param use_monte_carlo: :param minimum_firm_share: minimum share of firm capacity in p.u. :param use_firm_capacity_penalty: if to use the firm capacity penalty :param save_file: :param time_indices: array of time indices to use, if None all are used """ super().__init__(grid=grid, x_dim=len(grid.investments_groups), plot_x_idx=1, plot_y_idx=2) # options object self.n_monte_carlo_sim = n_monte_carlo_sim self.use_monte_carlo = use_monte_carlo self.minimum_firm_share: float = minimum_firm_share self.use_firm_capacity_penalty: bool = use_firm_capacity_penalty self.save_file = save_file self.time_indices = time_indices if self.save_file: self.output_f = open("adequacy_output.csv", "w") else: self.output_f = None # -------------------------------------------------------------------------------------------------------------- # gather problem structures # -------------------------------------------------------------------------------------------------------------- self.greedy_dispatch_inputs = GreedyDispatchInputs(grid=self.grid, time_indices=self.time_indices, logger=self.logger) self.years_starts_indices = determine_starting_index_of_every_year(index=self.grid.time_profile) years = len(self.years_starts_indices) self.x_max *= years # 0 is for not investing, any other number is for the year of entrance self.total_load = np.sum(self.greedy_dispatch_inputs.load_profile) self.inv_group_capex = self.grid.get_capex_by_investment_group() nc = compile_numerical_circuit_at(self.grid, t_idx=None) self.gen_mttf = nc.generator_data.mttf self.gen_mttr = nc.generator_data.mttr self.gen_capex = nc.generator_data.capex * nc.generator_data.snom # CAPEX in $ self.batt_capex = nc.battery_data.capex * nc.battery_data.snom self.dim = len(self.grid.investments_groups) if self.output_f is not None: # write header self.output_f.write("n_inv,LOLE(MWh),CAPEX(M$)," + ",".join(self.grid.get_investment_groups_names()) + "\n") self.dt = self.grid.get_time_deltas_in_hours() gen_dict = {idtag: idx for idx, idtag in enumerate(nc.generator_data.idtag)} batt_dict = {idtag: idx for idx, idtag in enumerate(nc.battery_data.idtag)} inv_group_dict = self.grid.get_investment_by_groups_index_dict() self.dim2gen = lil_matrix((nc.generator_data.nelm, self.dim)) self.dim2batt = lil_matrix((nc.battery_data.nelm, self.dim)) self.inv_gen_idx = list() self.inv_batt_idx = list() for inv_group_idx, invs in inv_group_dict.items(): for investment in invs: gen_idx = gen_dict.get(investment.device_idtag, None) if gen_idx is not None: self.dim2gen[gen_idx, inv_group_idx] = 1 self.inv_gen_idx.append(gen_idx) else: batt_idx = batt_dict.get(investment.device_idtag, None) if batt_idx is not None: self.dim2batt[batt_idx, inv_group_idx] = 1 self.inv_batt_idx.append(batt_idx) self.inv_gen_idx = np.array(self.inv_gen_idx) self.inv_batt_idx = np.array(self.inv_batt_idx) self.branches_cost = np.array([e.Cost for e in grid.get_branches(add_hvdc=False, add_vsc=False, add_switch=True)], dtype=float)
[docs] def n_objectives(self) -> int: """ Number of objectives (size of f) :return: """ if self.use_firm_capacity_penalty: return 5 else: return 4
[docs] def n_vars(self) -> int: """ Number of variables (size of x) :return: """ return self.x_dim
[docs] def get_objectives_names(self) -> StrVec: """ Get a list of names for the elements of f :return: """ if self.use_firm_capacity_penalty: return np.array(["LOLE", "CAPEX", "Unitary electricity cost", "Curtailment", "Firm capacity penalty"]) else: return np.array(["LOLE", "CAPEX", "Unitary electricity cost", "Curtailment"])
[docs] def get_vars_names(self) -> StrVec: """ Get a list of names for the elements of x :return: """ return np.array([e.name for e in self.grid.investments_groups])
[docs] def objective_function(self, x: Vec | IntVec) -> Vec: """ Evaluate x and return f(x) :param x: array of variable values :return: array of objectives """ # correct x correct_x(x=x, lb=self.x_min, ub=self.x_max) x_bin = x.astype(bool).astype(float) gen_mask = self.dim2gen @ x batt_mask = self.dim2batt @ x # compute the firm capacity if self.use_firm_capacity_penalty: # a generator is "firm" if it is dispatchable P_firm = np.sum(self.greedy_dispatch_inputs.gen_dispatchable * gen_mask * self.greedy_dispatch_inputs.gen_p_max) P_total = np.sum(gen_mask * self.greedy_dispatch_inputs.gen_p_max) if P_total > 0: firm_share = P_firm / P_total if firm_share < self.minimum_firm_share: # the penalty is the difference scaled by 1000 firm_capacity_penalty = (self.minimum_firm_share - firm_share) * 1000.0 else: firm_capacity_penalty = 0.0 else: firm_capacity_penalty = 0.0 else: firm_capacity_penalty = 0.0 # get the active arrays of generators and batteries gen_active = apply_actives_mask(original_active=self.greedy_dispatch_inputs.gen_active, mask_indices=self.inv_gen_idx, mask=gen_mask, years_starts_indices=self.years_starts_indices) batt_active = apply_actives_mask(original_active=self.greedy_dispatch_inputs.batt_active, mask_indices=self.inv_batt_idx, mask=batt_mask, years_starts_indices=self.years_starts_indices) # batt_pmax = self.greedy_dispatch_inputs.batt_p_max_charge.copy() # batt_pmax[:, self.inv_batt_idx] *= batt_mask[self.inv_batt_idx] # invested_batt_idx = np.where(batt_mask == 1)[0] # capex += np.sum(self.batt_capex[invested_batt_idx]) # compute the capex of the selected investment groups capex = np.sum(self.inv_group_capex * x_bin) if self.use_monte_carlo: lole_array, total_cost_arr, curtailment_arr = reliability_simulation( n_sim=self.n_monte_carlo_sim, load_profile=self.greedy_dispatch_inputs.load_profile, gen_profile=self.greedy_dispatch_inputs.gen_profile, gen_p_max=self.greedy_dispatch_inputs.gen_p_max, gen_p_min=self.greedy_dispatch_inputs.gen_p_min, gen_dispatchable=self.greedy_dispatch_inputs.gen_dispatchable, gen_active=gen_active, gen_cost=self.greedy_dispatch_inputs.gen_cost, gen_mttf=self.gen_mttf, gen_mttr=self.gen_mttr, batt_active=batt_active, batt_p_max_charge=self.greedy_dispatch_inputs.batt_p_max_charge, batt_p_max_discharge=self.greedy_dispatch_inputs.batt_p_max_discharge, batt_energy_max=self.greedy_dispatch_inputs.batt_energy_max, batt_eff_charge=self.greedy_dispatch_inputs.batt_eff_charge, batt_eff_discharge=self.greedy_dispatch_inputs.batt_eff_discharge, batt_cost=self.greedy_dispatch_inputs.batt_cost, batt_soc0=self.greedy_dispatch_inputs.batt_soc0, batt_soc_min=self.greedy_dispatch_inputs.batt_soc_min, dt=self.greedy_dispatch_inputs.dt, force_charge_if_low=True ) lole = np.cumsum(lole_array / (self.n_monte_carlo_sim - 1))[-1] total_cost = np.cumsum(total_cost_arr / (self.n_monte_carlo_sim - 1))[-1] ndg_curtailment = np.cumsum(curtailment_arr / (self.n_monte_carlo_sim - 1))[-1] else: (gen_dispatch, batt_dispatch, batt_energy, total_cost, load_not_supplied, load_shedding, ndg_surplus_after_batt, ndg_curtailment_per_gen) = greedy_dispatch2( load_profile=self.greedy_dispatch_inputs.load_profile, gen_profile=self.greedy_dispatch_inputs.gen_profile, gen_p_max=self.greedy_dispatch_inputs.gen_p_max, gen_p_min=self.greedy_dispatch_inputs.gen_p_min, gen_dispatchable=self.greedy_dispatch_inputs.gen_dispatchable, gen_active=gen_active, gen_cost=self.greedy_dispatch_inputs.gen_cost, batt_active=batt_active, batt_p_max_charge=self.greedy_dispatch_inputs.batt_p_max_charge, batt_p_max_discharge=self.greedy_dispatch_inputs.batt_p_max_discharge, batt_energy_max=self.greedy_dispatch_inputs.batt_energy_max, batt_eff_charge=self.greedy_dispatch_inputs.batt_eff_charge, batt_eff_discharge=self.greedy_dispatch_inputs.batt_eff_discharge, batt_cost=self.greedy_dispatch_inputs.batt_cost, batt_soc0=self.greedy_dispatch_inputs.batt_soc0, batt_soc_min=self.greedy_dispatch_inputs.batt_soc_min, dt=self.greedy_dispatch_inputs.dt, force_charge_if_low=True ) lole = np.sum(load_not_supplied) ndg_curtailment = np.sum(ndg_surplus_after_batt) if self.output_f is not None: # write header self.output_f.write(f"{sum(x)},{lole},{capex}" + ",".join([f"{xi}" for xi in x]) + "\n") unit_cost = total_cost / self.total_load print(f"n_inv: {sum(x)}, " f"lole: {lole}, " f"capex: {capex}, " f"e cost: {unit_cost}, " f"curtailment: {ndg_curtailment}, " f"firm penalty {firm_capacity_penalty}") if self.use_firm_capacity_penalty: return np.array([lole, capex, unit_cost, ndg_curtailment, firm_capacity_penalty]) else: return np.array([lole, capex, unit_cost, ndg_curtailment])