Source code for VeraGridEngine.Simulations.Reliability.reliability_driver

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.  
# SPDX-License-Identifier: MPL-2.0
import numba as nb
import numpy as np

from VeraGridEngine.enumerations import SimulationTypes, ReliabilityMode
from VeraGridEngine.basic_structures import Vec, BoolVec, Mat
from VeraGridEngine.Simulations.PowerFlow.power_flow_worker import PowerFlowOptions
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Simulations.driver_template import DriverTemplate
from VeraGridEngine.Simulations.OPF.simple_dispatch_ts import GreedyDispatchInputs
from VeraGridEngine.Simulations.Reliability.reliability import (reliability_simulation, generate_states_matrix,
                                                                find_time_blocks)
from VeraGridEngine.Simulations.Reliability.reliability_results import ReliabilityResults
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at


[docs] @nb.njit() def get_gen_pmax(nt: int, k: int, Snom: float, P_array: Vec, active_array: BoolVec, dispatchable_array: BoolVec): """ Get a generator array of Pmax given the active and dispatchable conditions :param nt: Number of time steps :param k: Generator index :param Snom: Nominal power :param P_array: Array of P :param active_array: array of active :param dispatchable_array: array of dispatchable :return: Array of possible Pmax """ assert len(P_array) == nt assert len(active_array) == nt assert len(dispatchable_array) == nt gen_pmax = np.zeros(nt) for t in range(nt): if dispatchable_array[t]: gen_pmax[t] = Snom * active_array[t] else: gen_pmax[t] = P_array[t] * active_array[t] return gen_pmax
[docs] @nb.njit(cache=True) def count_device_incidences(active_states: Mat) -> int: """ Count device incidence starts in an active-state matrix. An incidence is counted when one device changes from active to inactive. If one device is already inactive at ``t=0``, that also counts as one incidence. The matrix is organized as ``(time, device)``. :param active_states: Boolean matrix of shape (time, device) :return: Number of incidences. """ nt, n_devices = active_states.shape n_incidences = 0 for device_idx in range(n_devices): was_active = True for t in range(nt): is_active = active_states[t, device_idx] if (not is_active) and was_active: n_incidences += 1 was_active = is_active return n_incidences
[docs] class ReliabilityStudyDriver(DriverTemplate): __slots__ = ( "pf_options", "reliability_mode", "n_sim", "time_indices", "greedy_dispatch_inputs", "__cancel__", ) name = 'Reliability analysis' tpe = SimulationTypes.Reliability_run def __init__(self, grid: MultiCircuit, pf_options: PowerFlowOptions, reliability_mode: ReliabilityMode = ReliabilityMode.GenerationAdequacy, time_indices=None, n_sim: int = 1000000): """ ContinuationPowerFlowDriver constructor :param grid: NumericalCircuit instance :param pf_options: power flow options instance :param reliability_mode: ReliabilityMode :param time_indices: :param n_sim: Number of Monte-Carlo simulations """ DriverTemplate.__init__(self, grid=grid) # voltage stability options self.pf_options = pf_options self.reliability_mode = reliability_mode self.n_sim = n_sim if time_indices is None: self.time_indices = self.grid.get_all_time_indices() else: self.time_indices = time_indices self.results = ReliabilityResults(nsim=n_sim) self.greedy_dispatch_inputs = GreedyDispatchInputs(grid=self.grid, time_indices=self.time_indices, logger=self.logger) self.__cancel__ = False
[docs] def progress_callback(self, lmbda: float): """ Send progress report :param lmbda: lambda value :return: None """ self.report_text('Running voltage collapse lambda:' + "{0:.2f}".format(lmbda) + '...')
[docs] def run(self): """ Run reliability """ self.report_text("Running reliability study...") self.tic() if self.reliability_mode == ReliabilityMode.GenerationAdequacy: self.run_adequacy_reliability() elif self.reliability_mode == ReliabilityMode.GridMetrics: self.run_grid_reliability() self.toc() self.report_text("Done!") self.done_signal.emit()
[docs] def run_adequacy_reliability(self): """ run the voltage collapse simulation @return: """ horizon = self.grid.get_time_number() n_gen = self.grid.get_generators_number() gen_pmax = np.empty((horizon, n_gen), dtype=float) gen_mttf = np.zeros(n_gen) gen_mttr = np.zeros(n_gen) for k, gen in enumerate(self.grid.generators): gen_mttf[k] = gen.mttf gen_mttr[k] = gen.mttr gen_pmax[:, k] = get_gen_pmax( nt=horizon, k=k, Snom=gen.Snom, P_array=gen.P_prof.toarray(), active_array=gen.active_prof.toarray(), dispatchable_array=gen.enabled_dispatch_prof.toarray() ) lole, _, _ = reliability_simulation( n_sim=self.n_sim, load_profile=self.greedy_dispatch_inputs.load_profile, gen_profile=self.greedy_dispatch_inputs.gen_profile, gen_p_max=gen_pmax, gen_p_min=self.greedy_dispatch_inputs.gen_p_min, gen_dispatchable=self.greedy_dispatch_inputs.gen_dispatchable, gen_active=self.greedy_dispatch_inputs.gen_active, gen_cost=self.greedy_dispatch_inputs.gen_cost, gen_mttf=gen_mttf, gen_mttr=gen_mttr, batt_active=self.greedy_dispatch_inputs.batt_active, batt_p_max_charge=self.greedy_dispatch_inputs.batt_p_max_charge, batt_p_max_discharge=self.greedy_dispatch_inputs.batt_p_max_discharge, batt_energy_max=self.greedy_dispatch_inputs.batt_energy_max, batt_eff_charge=self.greedy_dispatch_inputs.batt_eff_charge, batt_eff_discharge=self.greedy_dispatch_inputs.batt_eff_discharge, batt_cost=self.greedy_dispatch_inputs.batt_cost, batt_soc0=self.greedy_dispatch_inputs.batt_soc0, batt_soc_min=self.greedy_dispatch_inputs.batt_soc_min, dt=self.greedy_dispatch_inputs.dt, force_charge_if_low=True, tol=1e-6 ) self.results.LOLE_evolution = np.cumsum(lole) / (np.arange(len(lole)) + 1) print(f"LOLE: {lole.mean()} MWh/year")
[docs] def run_grid_reliability(self) -> None: """ run the voltage collapse simulation @return: """ horizon = self.grid.get_time_number() nc = compile_numerical_circuit_at(self.grid) nc2 = nc.copy() # Energy not supplied array ENS_arr = np.zeros(self.n_sim) # nΒΊ of hours with incidences (not able to supply all demand) LOLE_arr = np.zeros(self.n_sim) # nΒΊ of incidences (not able to suppy all demand) LOLF_arr = np.zeros(self.n_sim) # nΒΊ of hours with failures (independently of if it is possible to supply demand) LOLE_total_arr = np.zeros(self.n_sim) # nΒΊ of failures (independently of if it is possible to supply demand) LOLF_total_arr = np.zeros(self.n_sim) # Total customer interruption duration (customer-hours) customer_interruption_hours_arr = np.zeros(self.n_sim) # Total customer interruptions customer_interruptions_arr = np.zeros(self.n_sim) # compute the number of customers per bus for SAIDI, SAIFI, CAIDI load_customers = np.array([self.grid.loads[idx].n_customers for idx in nc.load_data.original_idx], dtype=float) customers_per_bus = nc.load_data.get_array_per_bus(load_customers) total_n_customers = customers_per_bus.sum() for sim_idx in range(self.n_sim): if self.is_cancel(): return gen_actives, gen_n_failures = generate_states_matrix(mttf=nc.generator_data.mttf, mttr=nc.generator_data.mttr, horizon=horizon, initially_working=True) batt_actives, batt_n_failures = generate_states_matrix(mttf=nc.battery_data.mttf, mttr=nc.battery_data.mttr, horizon=horizon, initially_working=True) simulated_branch_actives, br_n_failures = generate_states_matrix(mttf=nc.passive_branch_data.mttf, mttr=nc.passive_branch_data.mttr, horizon=horizon, initially_working=True) if gen_n_failures + br_n_failures: all_actives = np.c_[gen_actives, batt_actives, simulated_branch_actives] blocks = find_time_blocks(horizon, all_actives) total_number_of_branch_incidences = count_device_incidences(simulated_branch_actives) total_number_of_gens_incidences = count_device_incidences(gen_actives) LOLF_total_arr[sim_idx] = total_number_of_branch_incidences + total_number_of_gens_incidences for idx_list in blocks: batt_e_nom = nc.battery_data.enom.copy() block_fail_to_meet_demand = False prev_interrupted_buses = np.zeros(nc.nbus, dtype=bool) for t in idx_list: # time_steps # get the time increment dt = self.greedy_dispatch_inputs.dt[t] fail_to_meet_demand = False LOLE_total_arr[sim_idx] += dt # modify active states nc2.passive_branch_data.active = simulated_branch_actives[t, :] nc2.generator_data.active = gen_actives[t, :] nc2.battery_data.active = batt_actives[t, :] E_not_supplied = 0.0 interrupted_buses = np.zeros(nc.nbus, dtype=bool) islands = nc2.split_into_islands(ignore_single_node_islands=False) for island in islands: if island.generator_data.active.sum() == 0: if island.battery_data.active.sum() == 0: E_not_supplied += island.load_data.S.sum() * dt fail_to_meet_demand = True interrupted_buses[island.bus_data.original_idx] = True else: # check the battery life island_energy_demand = island.load_data.S.sum().real * dt unsatisfied_demand = island_energy_demand for i in island.battery_data.original_idx: if unsatisfied_demand > batt_e_nom[i]: # we deplete the battery unsatisfied_demand -= batt_e_nom[i] batt_e_nom[i] = 0 else: # there is less demand that battery capacity batt_e_nom[i] -= unsatisfied_demand unsatisfied_demand = 0 if unsatisfied_demand > 0: fail_to_meet_demand = True interrupted_buses[island.bus_data.original_idx] = True E_not_supplied += unsatisfied_demand if fail_to_meet_demand: LOLE_arr[sim_idx] += dt block_fail_to_meet_demand = True customer_interruption_hours_arr[sim_idx] += dt * customers_per_bus[interrupted_buses].sum() new_interruptions = interrupted_buses & (~prev_interrupted_buses) customer_interruptions_arr[sim_idx] += customers_per_bus[new_interruptions].sum() prev_interrupted_buses = interrupted_buses # revert active states nc2.passive_branch_data.active = nc.passive_branch_data.active nc2.generator_data.active = nc.generator_data.active nc2.battery_data.active = nc.battery_data.active # Energy not supplied (MWh) ENS_arr[sim_idx] += E_not_supplied.real if block_fail_to_meet_demand: LOLF_arr[sim_idx] += 1 self.report_progress2(current=sim_idx, total=self.n_sim) # TODO: Check if the +1 is correct we should divide at the end by N-1 sim_array = np.arange(self.n_sim) + 1 self.results.LOLE_evolution = np.cumsum(LOLE_arr) / sim_array self.results.ENS_evolution = np.cumsum(ENS_arr) / sim_array self.results.LOLF_evolution = np.cumsum(LOLF_arr) / sim_array self.results.LOLET_evolution = np.cumsum(LOLE_total_arr) / sim_array self.results.LOLFT_evolution = np.cumsum(LOLF_total_arr) / sim_array cumulative_customer_interruption_hours = np.cumsum(customer_interruption_hours_arr) cumulative_customer_interruptions = np.cumsum(customer_interruptions_arr) if total_n_customers > 0: customer_base = sim_array * total_n_customers self.results.SAIDI_evolution = cumulative_customer_interruption_hours / customer_base self.results.SAIFI_evolution = cumulative_customer_interruptions / customer_base else: self.results.SAIDI_evolution = np.zeros(self.n_sim) self.results.SAIFI_evolution = np.zeros(self.n_sim) self.results.CAIDI_evolution = np.divide(cumulative_customer_interruption_hours, cumulative_customer_interruptions, out=np.zeros(self.n_sim), where=cumulative_customer_interruptions > 0)
[docs] def cancel(self): self._DriverTemplate__cancel__ = True self.__cancel__ = True