Source code for VeraGridEngine.Simulations.ATC.available_transfer_capacity_driver

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.  
# SPDX-License-Identifier: MPL-2.0

from __future__ import annotations
import numpy as np
import numba as nb
from typing import Tuple, List, Union, TYPE_CHECKING
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.Simulations.LinearFactors.linear_analysis import LinearAnalysis
from VeraGridEngine.Simulations.results_table import ResultsTable
from VeraGridEngine.Simulations.results_template import ResultsTemplate
from VeraGridEngine.Simulations.driver_template import DriverTemplate
from VeraGridEngine.Simulations.ATC.available_transfer_capacity_options import AvailableTransferCapacityOptions
from VeraGridEngine.enumerations import StudyResultsType, AvailableTransferMode, ResultTypes, DeviceType, \
    SimulationTypes
from VeraGridEngine.basic_structures import Vec, IntVec, Mat

if TYPE_CHECKING:
    from VeraGridEngine.Simulations import ClusteringResults
    from VeraGridEngine.Simulations.OPF.opf_results import OptimalPowerFlowResults


[docs] @nb.njit() def get_proportional_deltas_sensed(P, idx, dP=1.0): """ :param P: all power Injections :param idx: bus indices of the sending region :param dP: Power amount :return: """ # declare the power increment due to the transference deltaP = np.zeros(len(P)) nU = 0.0 nD = 0.0 for i in idx: if P[i] > 0: nU += P[i] if P[i] < 0: nD -= P[i] # store it as positive value # compute witch proportion to attend with positive and negative sense if (nU + nD) != 0.0: dPu = nU / (nU + nD) # positive proportion dPd = nD / (nU + nD) # negative proportion else: dPu = 0.0 dPd = 0.0 for i in idx: if P[i] > 0 and nU != 0.0: deltaP[i] = dP * dPu * P[i] / nU if P[i] < 0 and nD != 0.0: deltaP[i] = -dP * dPd * P[i] / nD # P[i] is already negative return deltaP
[docs] @nb.njit() def scale_proportional_sensed(P, idx1, idx2, dT=1.0): """ :param P: Power vector :param idx1: indices of sending region :param idx2: indices of receiving region :param dT: Exchange amount :return: """ dPu = get_proportional_deltas_sensed(P, idx1, dP=dT) dPd = get_proportional_deltas_sensed(P, idx2, dP=-dT) dP = dPu + dPd return P + dP
[docs] @nb.njit() def compute_dP(P0: Vec, Pgen: Vec, P_installed: Vec, Pload: Vec, bus_a1_idx: IntVec, bus_a2_idx: IntVec, dT: float = 1.0, mode: int = 0) -> Vec: """ Compute power injections to compute the inter-area sensitivities :param P0: all bus Injections [p.u.] :param Pgen: bus generation current power [p.u.] :param P_installed: bus generation installed power [p.u.] :param Pload: bus load power [p.u.] :param bus_a1_idx: bus indices of the sending region :param bus_a2_idx: bus indices of the receiving region :param dT: Exchange amount (MW) usually a unitary increment is sufficient :param mode: Type of power shift 0: shift generation based on the current generated power 1: shift generation based on the installed power 2: shift load 3 (or else): shift using generation and load :return: Exchange sensitivity vector for all the lines """ if mode == 0: # move the generators based on the generated power P = Pgen elif mode == 1: # move the generators based on the installed power P = P_installed elif mode == 2: # move the load P = Pload else: # move all of it P = P0 # compute the bus injection increments due to the exchange dPu = get_proportional_deltas_sensed(P, bus_a1_idx, dP=dT) dPd = get_proportional_deltas_sensed(P, bus_a2_idx, dP=-dT) dP = dPu + dPd return dP
[docs] @nb.njit(cache=True) def compute_alpha(ptdf: Mat, dP: Vec, dT: float = 1.0) -> Vec: """ Compute line sensitivity to power transfer :param ptdf: Power transfer distribution factors (n-branch, n-bus) :param dP: Vector of power increments to used for the power exchange :param dT: Exchange amount (MW) usually a unitary increment is sufficient :return: Exchange sensitivity vector for all the lines """ # compute the line flow increments due to the exchange increment dT in MW dflow = ptdf @ dP # compute the sensitivity alpha = dflow / dT return alpha
[docs] @nb.njit(cache=True) def compute_alpha_n1(ptdf: Mat, lodf: Mat, dP: Vec, alpha: Vec, dT=1.0) -> Mat: """ :param ptdf: Power transfer distribution factors (n-branch, n-bus) :param lodf: :param dP: :param alpha: :param dT: :return: """ # compute the line flow increments due to the exchange increment dT in MW dflow = ptdf @ dP alpha_n1 = np.zeros((len(alpha), len(alpha))) if lodf is not None: for m in nb.prange(len(alpha)): for c in range(len(alpha)): if m != c: dflow_n1 = dflow[m] + lodf[m, c] * dflow[c] alpha_n1[m, c] = dflow_n1 / dT return alpha_n1
[docs] @nb.njit(cache=True) def compute_atc_list(br_idx: IntVec, contingency_br_idx: IntVec, lodf: Mat, alpha: Vec, flows: Vec, rates: Vec, contingency_rates: Vec, base_exchange: float, threshold: float, time_idx: int) -> List[ Tuple[int, int, int, float, float, float, float, float, float, float, float, float, float, float, float]]: """ Compute all lines' available transfer capacity (ATC) :param br_idx: array of branch indices to analyze :param contingency_br_idx: array of branch indices to fail :param lodf: Line outage distribution factors (n-branch, n-outage branch) :param alpha: Branch sensitivities to the exchange [p.u.] :param flows: Branches power injected at the "from" side [MW] :param rates: all Branches rates vector :param contingency_rates: all Branches contingency rates vector :param base_exchange: amount already exchanges between areas :param threshold: value that determines if a line is studied for the ATC calculation :param time_idx: time index of the calculation :return: List of: time_idx, # 0 monitored index, # 1 contingency index, # 2 alpha of branch m, # 3 beta, # 4 lodf[m, c], # 5 atc_n, # 6 atc_mc, # 7 final_atc, # 8 ntc, # 9 flows[m], # 10 contingency_flow, # 11 loading, # 12 contingency loading, # 13 base_exchange #14 """ results = list() for im, m in enumerate(br_idx): # for each branch # if abs(alpha[m]) > threshold and abs(flows[m]) < rates[m]: # if the branch is relevant enough for the ATC... if abs(alpha[m]) > threshold: # if the branch is relevant enough for the ATC... # compute the ATC in "N" if alpha[m] == 0: atc_n = np.inf elif alpha[m] > 0: atc_n = (rates[m] - flows[m]) / alpha[m] else: atc_n = (-rates[m] - flows[m]) / alpha[m] # explore the ATC in "N-1" for ic, c in enumerate(contingency_br_idx): # for each contingency # compute the exchange sensitivity in contingency conditions beta = alpha[m] + lodf[m, c] * alpha[c] if m != c and abs(lodf[m, c]) > threshold and abs(beta) > threshold: # compute the contingency flow contingency_flow = flows[m] + lodf[m, c] * flows[c] # now here, do compare with the base situation # if abs(contingency_flow) <= contingency_rates[m]: # compute the ATC in "N-1" if beta == 0: atc_mc = np.inf elif beta > 0: atc_mc = (contingency_rates[m] - contingency_flow) / beta else: atc_mc = (-contingency_rates[m] - contingency_flow) / beta final_atc = min(atc_mc, atc_n) ntc = final_atc + base_exchange # refine the ATC to the most restrictive value every time results.append((time_idx, # 0 m, # 1 c, # 2 alpha[m], # 3 beta, # 4 lodf[m, c], # 5 atc_n, # 6 atc_mc, # 7 final_atc, # 8 ntc, # 9 flows[m], # 10 contingency_flow, # 11 flows[m] / (rates[m] + 1e-9) * 100.0, # 12 contingency_flow / (contingency_rates[m] + 1e-9) * 100.0, # 13 base_exchange)) # 14 return results
[docs] class AvailableTransferCapacityResults(ResultsTemplate): LOCAL_RESULTS_DECLARATIONS = tuple() __slots__ = ( "branch_names", "bus_names", "rates", "contingency_rates", "base_exchange", "report", "report_headers", "report_indices", "raw_report", ) def __init__(self, br_names, bus_names, rates, contingency_rates: Vec, clustering_results: Union[ClusteringResults, None]): """ :param br_names: :param bus_names: :param rates: :param contingency_rates: :param clustering_results: """ ResultsTemplate.__init__(self, name='ATC Results', available_results=[ ResultTypes.AvailableTransferCapacityReport ], time_array=None, clustering_results=clustering_results, study_results_type=StudyResultsType.AvailableTransferCapacity) self.branch_names = np.array(br_names, dtype=object) self.bus_names = bus_names self.rates = rates self.contingency_rates = contingency_rates self.base_exchange = 0 self.report = None self.report_headers = None self.report_indices = None self.raw_report = None
[docs] def get_steps(self): """ :return: """ return
[docs] def make_report(self, threshold: float = 0.0): """ :return: """ self.report_headers = ['Time', 'Branch', 'Base flow', 'Rate', 'Alpha', 'ATC normal', 'Limiting contingency branch', 'Limiting contingency flow', 'Contingency rate', 'Beta', 'Contingency ATC', 'ATC', 'Base exchange flow', 'NTC'] self.report = np.empty((len(self.raw_report), len(self.report_headers)), dtype=object) rep = np.array(self.raw_report) # sort by ATC if len(self.raw_report): m = rep[:, 1].astype(int) c = rep[:, 2].astype(int) self.report_indices = np.arange(0, len(rep)) # time self.report[:, 0] = 0 # Branch name self.report[:, 1] = self.branch_names[m] # Base flow' self.report[:, 2] = rep[:, 10] # rate self.report[:, 3] = self.rates[m] # 'Rate', (time, branch) # alpha self.report[:, 4] = rep[:, 3] # 'ATC normal' self.report[:, 5] = rep[:, 6] # contingency info ----- # 'Limiting contingency branch' self.report[:, 6] = self.branch_names[c] # 'Limiting contingency flow' self.report[:, 7] = rep[:, 11] # 'Contingency rate' (time, branch) self.report[:, 8] = self.contingency_rates[m] # 'Beta' self.report[:, 9] = rep[:, 4] # 'Contingency ATC' self.report[:, 10] = rep[:, 7] # Final ATC (worst between normal ATC and contingency ATC) self.report[:, 11] = rep[:, 8] # Base exchange flow self.report[:, 12] = rep[:, 14] # NTC self.report[:, 13] = rep[:, 9] # trim by abs alpha > threshold and loading <= 1 loading = np.abs(self.report[:, 2] / (self.report[:, 3] + 1e-20)) idx = np.where((np.abs(self.report[:, 4]) > threshold) & (loading <= 1.0))[0] self.report = self.report[idx, :] else: print('Empty raw report :/')
[docs] def get_dict(self): """ Returns a dictionary with the results sorted in a dictionary :return: dictionary of 2D numpy arrays (probably of complex numbers) """ data = super().get_dict() data['report'] = self.report.tolist() return data
[docs] def mdl(self, result_type: ResultTypes) -> ResultsTable: """ Plot the results :param result_type: :return: """ if result_type == ResultTypes.AvailableTransferCapacityReport: data = np.array(self.report) y_label = '' title = result_type.value return ResultsTable(data=np.array(self.report), index=self.report_indices, columns=self.report_headers, title=title, ylabel=y_label, cols_device_type=DeviceType.NoDevice, idx_device_type=DeviceType.NoDevice) else: raise Exception('Result type not understood:' + str(result_type))
[docs] class AvailableTransferCapacityDriver(DriverTemplate): __slots__ = ( "options", "opf_results", "t_idx", ) tpe = SimulationTypes.NetTransferCapacity_run name = tpe.value def __init__(self, grid: MultiCircuit, options: AvailableTransferCapacityOptions | None, opf_results: Union[OptimalPowerFlowResults, None] = None, t_idx: int | None = None): """ Power Transfer Distribution Factors class constructor @param grid: MultiCircuit Object @param options: OPF options @:param pf_results: PowerFlowResults, this is to get the Sf """ DriverTemplate.__init__(self, grid=grid) # Options to use self.options = options self.opf_results: OptimalPowerFlowResults | None = opf_results self.t_idx: int | None = t_idx # OPF results rates = self.grid.get_branch_rates() self.results = AvailableTransferCapacityResults(br_names=self.grid.get_branch_names(add_hvdc=False, add_vsc=False, add_switch=True), bus_names=self.grid.get_bus_names(), rates=rates, contingency_rates=rates, clustering_results=None)
[docs] def run(self) -> None: """ Run thread """ self.tic() self.report_text("Analyzing") self.report_progress(0.0) # get the converted bus indices idx1b = self.options.bus_idx_from idx2b = self.options.bus_idx_to # declare the numerical circuit nc = compile_numerical_circuit_at(circuit=self.grid, t_idx=self.t_idx, logger=self.logger, opf_results=self.opf_results) # declare the linear analysis linear = LinearAnalysis( nc=nc, distributed_slack=self.options.distributed_slack, correct_values=self.options.correct_values, ) # get the branch indices to analyze br_idx = nc.passive_branch_data.get_monitor_enabled_indices() con_br_idx = nc.passive_branch_data.get_contingency_enabled_indices() # declare the results self.results = AvailableTransferCapacityResults( br_names=nc.passive_branch_data.names, bus_names=nc.bus_data.names, rates=nc.passive_branch_data.rates, contingency_rates=nc.passive_branch_data.contingency_rates, clustering_results=None ) # compute the branch exchange sensitivity (alpha) """ 0: shift generation based on the current generated power 1: shift generation based on the installed power 2: shift load 3 (or else): shift udasing generation and load """ mode_2_int = {AvailableTransferMode.Generation: 0, AvailableTransferMode.InstalledPower: 1, AvailableTransferMode.Load: 2, AvailableTransferMode.GenerationAndLoad: 3} Sbus = nc.get_power_injections_pu() dP = compute_dP( P0=Sbus.real, P_installed=nc.bus_data.installed_power, Pgen=nc.generator_data.get_injections_per_bus().real, Pload=nc.load_data.get_injections_per_bus().real, bus_a1_idx=idx1b, bus_a2_idx=idx2b, mode=mode_2_int[self.options.mode], dT=1.0 ) alpha = compute_alpha( ptdf=linear.PTDF, dP=dP, dT=1.0 ) # get flow if self.options.use_provided_flows: flows = self.options.Pf if self.options.Pf is None: msg = 'The option to use the provided flows is enabled, but no flows are available' self.logger.add_error(msg) raise Exception(msg) else: # compose the HVDC power Injections (Shvdc, Losses_hvdc, Pf_hvdc, Pt_hvdc, loading_hvdc, n_free) = nc.hvdc_data.get_power(Sbase=nc.Sbase, theta=np.zeros(nc.nbus)) # flows in p.u. flows = linear.get_flows(Sbus=Sbus, P_hvdc=Pf_hvdc) # base exchange base_exchange = (self.options.inter_area_branch_sense * flows[self.options.inter_area_branch_idx]).sum() # consider the HVDC transfer if self.options.Pf_hvdc is not None: if len(self.options.idx_hvdc_br): base_exchange += (self.options.inter_area_hvdc_branch_sense * self.options.Pf_hvdc[self.options.idx_hvdc_br]).sum() # compute ATC report = compute_atc_list(br_idx=br_idx, contingency_br_idx=con_br_idx, lodf=linear.LODF, alpha=alpha, flows=flows, rates=nc.passive_branch_data.rates, contingency_rates=nc.passive_branch_data.contingency_rates, base_exchange=base_exchange, time_idx=0, threshold=self.options.threshold) if len(report): report = np.array(report, dtype=object) # sort by NTC report = report[report[:, 9].argsort()] # curtail report if self.options.max_report_elements > 0: report = report[:self.options.max_report_elements, :] else: report = np.zeros((0, 15), dtype=object) # post-process and store the results self.results.raw_report = report self.results.base_exchange = base_exchange self.results.make_report(threshold=self.options.threshold) self.toc()
[docs] def get_steps(self) -> List[int]: """ Get variations list of strings """ return list()