Source code for VeraGridEngine.Simulations.LinearFactors.linear_analysis

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations

import numpy as np
import numba as nb
import warnings
import scipy.sparse as sp
from typing import List, Dict, Tuple, TYPE_CHECKING

from scipy.sparse import lil_matrix
from scipy.sparse.linalg import spsolve as scipy_spsolve

from VeraGridEngine.enumerations import DeviceType
from VeraGridEngine.basic_structures import Logger, Vec, IntVec, CxVec, Mat, ObjVec, CxMat, BoolVec, IntMat, Vector
from VeraGridEngine.DataStructures.numerical_circuit import NumericalCircuit
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.Devices.Events.contingency_group import ContingencyGroup
from VeraGridEngine.Devices.Events.contingency import Contingency
from VeraGridEngine.Simulations.Derivatives.ac_jacobian import AC_jacobian
from VeraGridEngine.Simulations.Derivatives.csc_derivatives import dSf_dV_csc
from VeraGridEngine.Utils.Sparse.csc import dense_to_csc
import VeraGridEngine.Utils.Sparse.csc2 as csc
from VeraGridEngine.Utils.MIP.selected_interface import lpDot1D_changes
from VeraGridEngine.enumerations import ContingencyOperationTypes, ConverterControlType, HvdcControlType
from VeraGridEngine.Topology.topology import find_different_states

if TYPE_CHECKING:
    from VeraGridEngine.Devices.multi_circuit import MultiCircuit


[docs] @nb.njit() def make_contingency_flows(base_flow: Vec, lodf_factors: sp.csc_matrix, ptdf_factors: sp.csc_matrix, injections: Vec, threshold): """ Compute the general contingency flows :param base_flow: base flow (number of branches) :param lodf_factors: LODF factors (number of branches, number of branch contingencies) :param ptdf_factors: PTDF factors (number of branches, number of injection contingencies) :param injections: Array of contingency injections) :param threshold: PTDF and LODF threshold :return: contingency flows (number of branches) """ branch_number = lodf_factors.shape[0] branch_contingency_number = lodf_factors.shape[1] injection_number = ptdf_factors.shape[1] flow_n1: Vec = np.zeros(branch_number) if branch_number < 100: for m in range(branch_number): # copy the base flow flow_n1[m] = base_flow[m] # add the branch contingency influences for c in range(branch_contingency_number): if abs(lodf_factors[m, c]) > threshold: flow_n1[m] += lodf_factors[m, c] * base_flow[c] # add the injection influences for c in range(injection_number): if abs(ptdf_factors[m, c]) > threshold: flow_n1[m] += ptdf_factors[m, c] * injections[c] else: # parallel version for m in nb.prange(branch_number): # copy the base flow flow_n1[m] = base_flow[m] for c in range(branch_contingency_number): if abs(lodf_factors[m, c]) > threshold: flow_n1[m] += lodf_factors[m, c] * base_flow[c] for c in range(injection_number): if abs(ptdf_factors[m, c]) > threshold: flow_n1[m] += ptdf_factors[m, c] * injections[c] return flow_n1
[docs] def make_jacobian_ptdf(Ybus: sp.csc_matrix, Yf: sp.csc_matrix, F: IntVec, T: IntVec, V: CxVec, pq: IntVec, pv: IntVec, bus_types: IntVec, distribute_slack: bool = False) -> Mat: """ Compute the AC-PTDF :param Ybus: admittance matrix :param Yf: Admittance matrix of the buses "from" :param F: array if Branches "from" bus indices :param T: array if Branches "to" bus indices :param V: voltages array :param pq: array of pq node indices :param pv: array of pv node indices :param bus_types: Array of bus types :param distribute_slack: distribute slack? :return: AC-PTDF matrix (Branches, buses) """ n = len(V) pvpq = np.r_[pv, pq] npq = len(pq) npv = len(pv) # compute the Jacobian J = AC_jacobian(Ybus, V, pvpq, pq) # if distribute_slack: # dP: Mat = np.ones((n, n)) * (-1 / (n - 1)) # for i in range(n): # dP[i, i] = 1.0 # else: # dP = np.eye(n, n) dP = make_dP_from_bus_types(bus_types=bus_types, distribute_slack=distribute_slack) # compose the compatible array (the Q increments are considered zero dQ = np.zeros((npq, n)) dS = np.r_[dP[pvpq, :], dQ] # solve the voltage increments dx, ok = csc.spsolve_csc(J, dS) # compute branch derivatives dSf_dVm, dSf_dVa = dSf_dV_csc(Yf.tocsc(), V, F, T) # compose the final AC-PTDF dPf_dVa = csc.mat_to_scipy(dSf_dVa.real)[:, pvpq] dPf_dVm = csc.mat_to_scipy(dSf_dVm.real)[:, pq] PTDF = sp.hstack((dPf_dVa, dPf_dVm)) * dx return PTDF
[docs] @nb.njit(cache=True) def make_dP_from_bus_types(bus_types: IntVec, distribute_slack: bool) -> Mat: """ Build the dP / Dij matrix used to modify the PTDF. bus_types: 1 -> PQ 2 -> PV 3 -> Slack If distribute_slack=True: The balancing injection is distributed equally among PV + Slack buses. P_eff = dP @ P P_eff = P - w * sum(P) If distribute_slack=False: Return identity, preserving the current single-slack PTDF behavior. """ PQ = 1 PV = 2 SLACK = 3 n = bus_types.shape[0] # Start with identity dP = np.zeros((n, n), dtype=np.float64) for i in range(n): dP[i, i] = 1.0 if not distribute_slack: return dP # Count participating buses: PV + Slack n_participating = 0 for i in range(n): if bus_types[i] == PV or bus_types[i] == SLACK: n_participating += 1 if n_participating == 0: raise ValueError("No PV or Slack buses found for distributed slack") share = 1.0 / n_participating # dP = I - w * 1^T # # Since w_i = share for PV/Slack buses and zero for PQ buses, # only the rows of PV/Slack buses are modified. for i in range(n): if bus_types[i] == PV or bus_types[i] == SLACK: for j in range(n): dP[i, j] -= share return dP
[docs] @nb.njit(cache=True) def make_corrected_injections(P: Vec, bus_types: IntVec, distribute_slack: bool) -> Vec: """ Compute the effective active-power injection vector used for KCL checks. If distribute_slack=True: mismatch is shared equally among PV + Slack buses. If distribute_slack=False: mismatch is assigned only to the Slack bus or buses. """ PQ = 1 PV = 2 SLACK = 3 n = P.shape[0] P_eff = P.copy() mismatch = 0.0 for i in range(n): mismatch += P[i] n_participating = 0 for i in range(n): if distribute_slack: if bus_types[i] == PV or bus_types[i] == SLACK: n_participating += 1 else: if bus_types[i] == SLACK: n_participating += 1 if n_participating == 0: raise ValueError("No slack-participating buses found") share = mismatch / n_participating for i in range(n): if distribute_slack: if bus_types[i] == PV or bus_types[i] == SLACK: P_eff[i] -= share else: if bus_types[i] == SLACK: P_eff[i] -= share return P_eff
[docs] @nb.njit(cache=True) def make_corrected_injections_2d(P: Mat, bus_types: IntVec, distribute_slack: bool) -> Mat: """ Compute the effective active-power injection vector used for KCL checks. If distribute_slack=True: mismatch is shared equally among PV + Slack buses. If distribute_slack=False: mismatch is assigned only to the Slack bus or buses. """ PQ = 1 PV = 2 SLACK = 3 nt, n = P.shape P_eff = P.copy() n_participating = 0 for i in range(n): if distribute_slack: if bus_types[i] == PV or bus_types[i] == SLACK: n_participating += 1 else: if bus_types[i] == SLACK: n_participating += 1 if n_participating == 0: raise ValueError("No slack-participating buses found") mismatch = np.zeros(nt) for t in range(nt): for i in range(n): mismatch[t] += P[t, i] share = mismatch[t] / n_participating for i in range(n): if distribute_slack: if bus_types[i] == PV or bus_types[i] == SLACK: P_eff[t, i] -= share else: if bus_types[i] == SLACK: P_eff[t, i] -= share return P_eff
[docs] def make_ptdf(Bpqpv: sp.csc_matrix, Bf: sp.csc_matrix, no_slack: IntVec, bus_types: IntVec, distribute_slack: bool = True) -> Mat: """ Build the PTDF matrix :param Bpqpv: DC-linear susceptance matrix already sliced :param Bf: Bus-branch "from" susceptance matrix :param no_slack: array of sorted pq and pv node indices :param bus_types: array of bus types :param distribute_slack: distribute the slack? :return: PTDF matrix. It is a full matrix of dimensions Branches x buses """ n = Bf.shape[1] # number of buses # if distribute_slack: # dP: Mat = np.ones((n, n)) * (-1 / (n - 1)) # for i in range(n): # dP[i, i] = 1.0 # else: # dP: Mat = np.eye(n, n) dP = make_dP_from_bus_types(bus_types=bus_types, distribute_slack=distribute_slack) # solve for change in voltage angles dTheta = np.zeros((n, n)) # Bref = Bbus[noslack, :][:, noref].tocsc() dtheta_ref = scipy_spsolve(Bpqpv, dP[no_slack, :]) if sp.issparse(dtheta_ref): dTheta[no_slack, :] = dtheta_ref.toarray() else: dTheta[no_slack, :] = dtheta_ref # compute the corresponding change in branch Sf # Bf is a sparse matrix H = Bf @ dTheta # PTDF = Bf x (B^-1 x P) return H
[docs] def make_acdc_ptdf(nc: NumericalCircuit, logger: Logger, bus_types: IntVec, distribute_slack: bool = False) -> Mat: """ Build the ACDC PTDF matrix :param nc: NumericalCircuit :param logger: Logger :param bus_types: Array of bus types for the distributed slack :param distribute_slack: distribute the slack? :return: PTDF matrix. It is a full matrix of dimensions Branches x buses """ n = nc.nbus # mount the base matrix A = lil_matrix((n, n)) Af = lil_matrix((nc.nbr, n)) for k in range(nc.nbr): f = nc.passive_branch_data.F[k] t = nc.passive_branch_data.T[k] if nc.bus_data.is_dc[f] and nc.bus_data.is_dc[t]: # this is a dc branch ys = float(nc.passive_branch_data.active[k]) / (nc.passive_branch_data.R[k] + 1e-20) elif not nc.bus_data.is_dc[f] and not nc.bus_data.is_dc[t]: # this is an ac branch ys = float(nc.passive_branch_data.active[k]) / (nc.passive_branch_data.X[k] + 1e-20) else: # this is an error raise AttributeError(f"The branch {k} is nether fully AC not fully DC :(") Af[k, f] = ys Af[k, t] = -ys A[f, f] += ys A[f, t] -= ys A[t, f] -= ys A[t, t] += ys # fake impedances for converters for k in range(nc.nvsc): f = nc.vsc_data.F[k] t = nc.vsc_data.T[k] if nc.vsc_data.control1_int[k] == ConverterControlType.Pdc_angle_droop.idx(): # P-MODE 3: The VSC behaves as a droop control # P = P0 + k * (theta_f - theta_t) # k is in MW/deg, we need it in p.u./rad ys = nc.vsc_data.control1_val[k] * 57.295779513 / nc.Sbase else: # Non-droop VSCs: small coupling to keep the AC-DC topology visible ys = 0.01 A[f, f] += ys A[f, t] -= ys A[t, f] -= ys A[t, t] += ys # fake impedances for hvdc for k in range(nc.nhvdc): f = nc.hvdc_data.F[k] t = nc.hvdc_data.T[k] if nc.hvdc_data.control_mode_int[k] == HvdcControlType.type_0_free.idx(): # Free mode: P = Pset + angle_droop * (theta_f - theta_t) # angle_droop is in MW/deg, we need it in p.u./rad ys = nc.hvdc_data.angle_droop[k] * 57.295779513 / nc.Sbase else: # Pset mode: small coupling to keep the AC-DC topology visible ys = 0.01 A[f, f] += ys A[f, t] -= ys A[t, f] -= ys A[t, t] += ys # detect how to slice no_slack = list() dc_sl = list() ac_sl = list() for i in range(n): if nc.bus_data.is_dc[i]: if nc.bus_data.is_vm_controlled[i]: dc_sl.append(i) else: no_slack.append(i) else: if nc.bus_data.is_vm_controlled[i] and nc.bus_data.is_va_controlled[i]: ac_sl.append(i) else: no_slack.append(i) # if distribute_slack: # dP: Mat = np.eye(n) - 1.0 / n # else: # dP = np.eye(n, n) dP = make_dP_from_bus_types(bus_types=bus_types, distribute_slack=distribute_slack) Ared = A[no_slack, :][:, no_slack] Pred = dP[no_slack, :] dTheta = np.zeros((n, n)) with warnings.catch_warnings(): warnings.filterwarnings('error') try: dtheta_ref = sp.linalg.spsolve(Ared.tocsc(), Pred) dTheta[no_slack, :] = dtheta_ref except sp.linalg.MatrixRankWarning as e: logger.add_error("ACDC PTDF singular matrix. Does each subgrid have a slack?") H = Af @ dTheta return H
[docs] def make_lodf(Cf: sp.csc_matrix, Ct: sp.csc_matrix, PTDF: Mat, correct_values: bool = False, numerical_zero: float = 1e-10) -> Mat: """ Compute the LODF matrix :param Cf: Branch "from" -bus connectivity matrix :param Ct: Branch "to" -bus connectivity matrix :param PTDF: PTDF matrix in numpy array form (Branches, buses) :param correct_values: correct values out of the interval :param numerical_zero: value considered zero in numerical terms (i.e. 1e-10) :return: LODF matrix of dimensions (Branches, Branches) """ nl = PTDF.shape[0] # compute the connectivity matrix Cft = Cf - Ct H = PTDF * Cft.T # this loop avoids the divisions by zero # in those cases the LODF column should be zero LODF: Mat = np.zeros((nl, nl)) div = 1 - H.diagonal() for j in range(H.shape[1]): if abs(div[j]) > numerical_zero: LODF[:, j] = H[:, j] / div[j] # replace the diagonal elements by -1 # old code # LODF = LODF - sp.diags(LODF.diagonal()) - sp.eye(nl, nl), replaced by: for i in range(nl): LODF[i, i] = - 1.0 if correct_values: LODF[LODF > 1.2] = 0 LODF[LODF < -1.2] = 0 # LODF[LODF > 1.] = 1. # LODF[LODF < -1.] = 1. return LODF
[docs] @nb.njit(cache=True) def make_transfer_limits(ptdf: Mat, flows: Vec, rates: Vec) -> Vec: """ Compute the maximum transfer limits of each branch in normal operation :param ptdf: power transfer distribution factors matrix (n-branch, n-bus) :param flows: base Sf in MW :param rates: array of branch rates :return: Max transfer limits vector (n-branch) """ nbr = ptdf.shape[0] nbus = ptdf.shape[1] tmc: Vec = np.zeros(nbr) for m in range(nbr): for i in range(nbus): if ptdf[m, i] != 0.0: val = (rates[m] - flows[m]) / ptdf[m, i] # I want it with sign # update the transference value if abs(val) > abs(tmc[m]): tmc[m] = val return tmc
[docs] @nb.njit(cache=True) def create_M_numba(lodf: Mat, branch_contingency_indices) -> Mat: """ :param lodf: :param branch_contingency_indices: :return: """ M: Mat = np.empty((len(branch_contingency_indices), len(branch_contingency_indices))) for i in range(len(branch_contingency_indices)): for j in range(len(branch_contingency_indices)): if i == j: M[i, j] = 1.0 else: M[i, j] = -lodf[branch_contingency_indices[i], branch_contingency_indices[j]] return M
[docs] class LinearAnalysis: """ Linear Analysis """ def __init__(self, nc: NumericalCircuit, distributed_slack: bool = False, correct_values: bool = False, logger: Logger = Logger()): """ Linear Analysis constructor :param nc: numerical circuit instance :param distributed_slack: boolean to distribute slack :param correct_values: boolean to fix out layer values """ self.logger: Logger = logger self.islands: List[NumericalCircuit] = nc.split_into_islands() n_br = nc.nbr n_bus = nc.nbus n_hvdc = nc.hvdc_data.nelm n_vsc = nc.vsc_data.nelm self.PTDF: Mat = np.zeros((n_br, n_bus)) self.PTDF_by_island: List[Mat] = list() self.LODF: Mat = np.zeros((n_br, n_br)) self.HvdcDF: Mat = np.zeros((n_br, n_hvdc)) self.HvdcODF: Mat = np.zeros((n_br, n_hvdc)) self.VscDF: Mat = np.zeros((n_br, n_vsc)) self.VscODF: Mat = np.zeros((n_br, n_vsc)) self.bus_types = nc.bus_data.bus_types.copy() self.distributed_slack = distributed_slack # compute the PTDF per islands if len(self.islands) > 0: for n_island, island in enumerate(self.islands): indices = island.get_simulation_indices() # no slacks will make it impossible to compute the PTDF analytically if len(indices.vd) == 1: if len(indices.no_slack) > 0: if island.bus_data.is_dc.any(): ptdf_island = make_acdc_ptdf( nc=island, logger=self.logger, bus_types=island.bus_data.bus_types, distribute_slack=distributed_slack ) else: adml = island.get_linear_admittance_matrices(indices=indices) Bpqpv = adml.get_Bred(pqpv=indices.no_slack) # compute the PTDF of the island ptdf_island = make_ptdf( Bpqpv=Bpqpv, Bf=adml.Bf, no_slack=indices.no_slack, bus_types=island.bus_data.bus_types, distribute_slack=distributed_slack ) # store maybe for later self.PTDF_by_island.append(ptdf_island) # assign the PTDF to the main PTDF matrix self.PTDF[np.ix_(island.passive_branch_data.original_idx, island.bus_data.original_idx)] = ptdf_island # compute the island LODF lodf_island = make_lodf( Cf=island.passive_branch_data.Cf.tocsc(), Ct=island.passive_branch_data.Ct.tocsc(), PTDF=ptdf_island, correct_values=correct_values ) # assign the LODF to the main LODF matrix self.LODF[np.ix_(island.passive_branch_data.original_idx, island.passive_branch_data.original_idx)] = lodf_island else: self.logger.add_error('No PQ or PV nodes', 'Island {}'.format(n_island)) elif len(indices.vd) == 0: self.logger.add_warning('No slack bus', 'Island {}'.format(n_island)) else: self.logger.add_error('More than one slack bus', 'Island {}'.format(n_island)) else: # there are no islands pass # compute the HVDC PTDF (HVDC lines, Buses) # A_hvdc = lil_matrix((n_bus, n_hvdc)) for k in range(n_hvdc): f = nc.hvdc_data.F[k] t = nc.hvdc_data.T[k] # A_hvdc[f, k] = -1 # subtracts power at the "from" side # A_hvdc[t, k] = 1 # injects power at the "to" side self.HvdcDF[:, k] = self.PTDF[:, t] - self.PTDF[:, f] self.HvdcODF[:, k] = self.PTDF[:, f] - self.PTDF[:, t] # self.HvdcDF = self.PTDF @ A_hvdc # compute the VSC PTDF (HVDC lines, Buses) # A_vsc = lil_matrix((n_bus, n_vsc)) for k in range(n_vsc): f = nc.vsc_data.F[k] t = nc.vsc_data.T[k] # A_vsc[f, k] = -1 # subtracts power at the "from" side # A_vsc[t, k] = 1 # injects power at the "to" side self.VscDF[:, k] = self.PTDF[:, t] - self.PTDF[:, f] self.VscODF[:, k] = self.PTDF[:, f] - self.PTDF[:, t] # self.VscDF = self.PTDF @ A_vsc
[docs] def get_transfer_limits(self, flows: np.ndarray, rates: Vec): """ Compute the maximum transfer limits of each branch in normal operation :param flows: base Sf in MW :param rates: rates in MW :return: Max transfer limits vector (n-branch) """ return make_transfer_limits( ptdf=self.PTDF, flows=flows, rates=rates )
[docs] def get_flows(self, Sbus: CxVec | Vec, P_hvdc: Vec | None = None, P_vsc: Vec | None = None) -> CxVec | Vec: """ Compute the time series branch Sf using the PTDF :param Sbus: Power Injections time series array (nbus) :param P_hvdc: Power from HvdcLines (nhvdc) (optional) :param P_vsc: Power from Vsc (nvsc) (optional) :return: branch active power Sf (nbranch) """ if Sbus.ndim == 1: Pf = self.PTDF @ Sbus.real if P_hvdc is not None: Pf += self.HvdcDF @ P_hvdc if P_vsc is not None: Pf += self.VscDF @ P_vsc return Pf else: raise Exception(f'Sbus has unsupported dimensions: {Sbus.shape}')
[docs] def get_flows2d(self, Sbus: CxMat | Mat, P_hvdc: Mat | None = None, P_vsc: Mat | None = None) -> CxMat | Mat: """ Compute the time series branch Sf using the PTDF :param Sbus: Power Injections time series array (time, nbus) for 2D :param P_hvdc: Power from HvdcLines (nhvdc) (optional) :param P_vsc: Power from Vsc (nvsc) (optional) :return: branch active power Sf (time, nbranch) """ if Sbus.ndim == 2: Pf = np.dot(self.PTDF, Sbus.real.T).T if P_hvdc is not None: Pf += np.dot(self.HvdcDF, P_hvdc.T).T if P_vsc is not None: Pf += np.dot(self.VscDF, P_vsc.T).T return Pf else: raise Exception(f'Sbus has unsupported dimensions: {Sbus.shape}')
[docs] def get_injections(self, flows: Vec) -> Vec: """ Get injections that satisfy the flows :param flows: Array of flows (nbranch) :return: Array of injections (nbus) """ if flows.ndim == 1: Pbus, _, _, _ = np.linalg.lstsq(self.PTDF, flows) return Pbus else: raise Exception(f'flows has unsupported dimensions: {flows.shape}')
[docs] def get_reverse_injections_2d(self, flows: Mat) -> Mat: """ Get injections that satisfy the flows :param flows: Matrix of flows (time, nbranch) :return: Matrix of injections (time, nbus) """ if flows.ndim == 2: Pbus_t, _, _, _ = np.linalg.lstsq(self.PTDF, flows.T) return Pbus_t.T else: raise Exception(f'flows has unsupported dimensions: {flows.shape}')
[docs] def get_corrected_injections(self, P: Vec) -> Vec: """ This function corrects the original injections to match the PTDF flows :param P: Nodal injections :return: Corrected nodal injections """ # # Now we need to recompose the corrected injections # P_eff = np.zeros_like(P, dtype=float) # # # Correction of the injections vector -------------------------------------------------------------------------- # for island in self.islands: # # island_nbus = len(island.bus_data.original_idx) # # if island_nbus != 0: # indices = island.get_simulation_indices() # if self.distributed_slack: # # We need to apply the distributed slack factors to the injections # # if island_nbus <= 1: # P_eff[island.bus_data.original_idx] = 0.0 # else: # # VeraGrid current dP convention: # # P_eff_i = P_i - sum(P_j for j != i) / (m - 1) # Pi = P[island.bus_data.original_idx] # P_eff[island.bus_data.original_idx] = Pi - (Pi.sum() - Pi) / (island_nbus - 1) # else: # # We need to compute the slack power injection # if len(indices.vd) == 1: # # slack_local = indices.vd[0] # slack_global = island.bus_data.original_idx[slack_local] # Pi = P[island.bus_data.original_idx] # # # Non-slack buses keep their specified injections. # P_eff[island.bus_data.original_idx] = Pi # # # The slack bus absorbs the island mismatch. # # Equivalent to: P_eff[slack] = -sum(P_non_slack) # P_eff[slack_global] -= Pi.sum() # else: # print(f"Expected exactly one slack bus per island, got {len(indices.vd)}") # # return P_eff return make_corrected_injections( P=P, bus_types=self.bus_types, distribute_slack=self.distributed_slack )
[docs] def get_corrected_injections2d(self, P: Mat) -> Vec: """ This function corrects the original injections to match the PTDF flows :param P: Nodal injections :return: Corrected nodal injections """ return make_corrected_injections_2d( P=P, bus_types=self.bus_types, distribute_slack=self.distributed_slack )
[docs] class LinearMultiContingency: """ LinearMultiContingency """ def __init__(self, branch_indices: IntVec, hvdc_indices: IntVec, vsc_indices: IntVec, bus_indices: IntVec, mlodf_factors: sp.csc_matrix, compensated_ptdf_factors: sp.csc_matrix, hvdc_odf: sp.csc_matrix, vsc_odf: sp.csc_matrix, injections_factor: Vec): """ Linear multi contingency object :param branch_indices: contingency branch indices. :param hvdc_indices: HvdcLine indices that belong into the contingency :param vsc_indices: VSC indices that belong into the contingency :param bus_indices: contingency bus indices. :param mlodf_factors: MLODF factors applicable (all_branches, contingency branches). Should be: MLODF[k, Ξ²Ξ΄] :param compensated_ptdf_factors: compensated PTDF factors applicable (all_branches, contingency buses) should be: MLODF[k, Ξ²Ξ΄] x PTDF[Ξ²Ξ΄, i] + PTDF[k, i] :param injections_factor: Injection contingency factors, i.e percentage to decrease an injection (len(bus indices)) """ assert len(bus_indices) == len(injections_factor) self.branch_indices: IntVec = branch_indices self.hvdc_indices: IntVec = hvdc_indices self.vsc_indices: IntVec = vsc_indices self.bus_indices: IntVec = bus_indices # MLODF[k, Ξ²Ξ΄] self.mlodf_factors: sp.csc_matrix = mlodf_factors # MLODF[k, Ξ²Ξ΄] x PTDF[Ξ²Ξ΄, i] + PTDF[k, i] # if not isinstance(compensated_ptdf_factors, sp.csc_matrix): # print() self.compensated_ptdf_factors: sp.csc_matrix = compensated_ptdf_factors # percentage to decrease an injection, used to compute Ξ”P self.injections_factor: Vec = injections_factor self.hvdc_odf = hvdc_odf self.vsc_odf = vsc_odf # store flag for contingency emptyness # self.enabled: bool = (self.mlodf_factors.nnz > 0 # or self.compensated_ptdf_factors.nnz > 0 # or injections_factor.sum() > 1) self.enabled: bool = True
[docs] def has_injection_contingencies(self) -> bool: """ Check if this multi-contingency has bus injection modifications :return: true / false """ return len(self.bus_indices) > 0
[docs] def get_contingency_flows(self, base_branches_flow: Vec, injections: Vec, hvdc_flow: Vec | None = None, vsc_flow: Vec | None = None) -> Vec: """ Get contingency flows :param base_branches_flow: Base branch flows (nbranch) :param injections: Bus injections increments (nbus) :param hvdc_flow: Base HvdcLine flows (n_hvdc) :param vsc_flow: Base Vsc flows (n_vsc) :return: New flows (nbranch) """ flow = base_branches_flow.copy() if len(self.branch_indices): # MLODF[k, Ξ²Ξ΄] x Pf0[Ξ²Ξ΄] flow += self.mlodf_factors @ base_branches_flow[self.branch_indices] if len(self.hvdc_indices) and hvdc_flow is not None: flow += self.hvdc_odf @ hvdc_flow[self.hvdc_indices] if len(self.vsc_indices) and vsc_flow is not None: flow += self.vsc_odf @ vsc_flow[self.vsc_indices] if len(self.bus_indices): injection_delta = injections[self.bus_indices] # (MLODF[k, Ξ²Ξ΄] x PTDF[Ξ²Ξ΄, i] + PTDF[k, i]) x Ξ”P[i] flow += self.compensated_ptdf_factors @ injection_delta return flow
[docs] def get_lp_contingency_flows(self, base_flow: ObjVec, injections: ObjVec, hvdc_flow: ObjVec | None = None, vsc_flow: ObjVec | None = None) -> Tuple[ObjVec, BoolVec, IntVec]: """ Get contingency flows using the LP interface equations :param base_flow: Base branch flows (nbranch) :param injections: Bus injections (nbus) :param hvdc_flow: Base HvdcLine flows (n_hvdc) :param vsc_flow: Base Vsc flows (n_vsc) :return: New flows (nbranch) """ mask: BoolVec = np.zeros(len(base_flow), dtype=bool) flow: ObjVec = base_flow.copy() changed_idx: IntVec = np.zeros(0, dtype=int) if len(self.branch_indices) > 0: inc, changed_idx = lpDot1D_changes(self.mlodf_factors, base_flow[self.branch_indices]) if len(changed_idx) > 0: mask[changed_idx] = True flow[changed_idx] += inc[changed_idx] if len(self.hvdc_indices) > 0 and hvdc_flow is not None: inc, changed_idx = lpDot1D_changes(self.hvdc_odf, hvdc_flow[self.hvdc_indices]) if len(changed_idx) > 0: mask[changed_idx] = True flow[changed_idx] += inc[changed_idx] if len(self.vsc_indices) > 0 and vsc_flow is not None: inc, changed_idx = lpDot1D_changes(self.vsc_odf, vsc_flow[self.vsc_indices]) if len(changed_idx) > 0: mask[changed_idx] = True flow[changed_idx] += inc[changed_idx] if len(self.bus_indices) > 0: injection_delta: ObjVec = self.injections_factor * injections[self.bus_indices] inc, changed_idx = lpDot1D_changes(self.compensated_ptdf_factors, injection_delta) if len(changed_idx) > 0: mask[changed_idx] = True flow[changed_idx] += inc[changed_idx] return flow, mask, changed_idx
[docs] def get_alpha_n1(self, dP: Vec, dT: float): """ Compute the N-1 sensitivities to the inter-area exchange :param dP: Inter-area power exchanges computed with (compute_dP) :param dT: Exchange amount (MW) usually a unitary increment is sufficient (use the value used to compute dP) :return: N-1 branch exchange sensitivities """ # (MLODF[k, Ξ²Ξ΄] x PTDF[Ξ²Ξ΄, i] + PTDF[k, i]) x Ξ”P[i] dflow = self.compensated_ptdf_factors @ dP[self.bus_indices] # dflow_n1 = dflow[m] + lodf[m, c] * dflow[c] # MLODF[k, Ξ²Ξ΄] x Pf0[Ξ²Ξ΄] dflow_n1 = self.mlodf_factors @ dflow[self.branch_indices] alpha_n1 = dflow_n1 / dT return alpha_n1
[docs] class ContingencyIndices: """ Contingency indices """ def __init__(self, contingency_group: ContingencyGroup, contingency_group_dict: Dict[str, List[Contingency]], branches_dict: Dict[str, int], hvdc_dict: Dict[str, int], vsc_dict: Dict[str, int], injections_bus_index_dict: Dict[str, int]): """ Contingency indices :param contingency_group: ContingencyGroup :param contingency_group_dict: dictionary to get the list of contingencies matching a contingency group :param branches_dict: dictionary to get the branch index by the branch idtag :param hvdc_dict: dictionary to get the HvdcLine index by the branch idtag :param vsc_dict: dictionary to get the Vsc index by the branch idtag :param injections_bus_index_dict: dictionary to get the injection device bus index by the generator idtag """ # get the group's contingencies contingencies: List[Contingency] = contingency_group_dict[contingency_group.idtag] branch_contingency_indices_list = list() hvdc_contingency_indices_list = list() vsc_contingency_indices_list = list() bus_contingency_indices_list = list() injections_factors_list = list() # apply the contingencies for cnt in contingencies: if cnt.prop == ContingencyOperationTypes.Active: if cnt.tpe == DeviceType.HVDCLineDevice: hvdc_idx = hvdc_dict.get(cnt.device_idtag, None) if hvdc_idx is not None: hvdc_contingency_indices_list.append(hvdc_idx) elif cnt.tpe == DeviceType.VscDevice: vsc_idx = vsc_dict.get(cnt.device_idtag, None) if vsc_idx is not None: vsc_contingency_indices_list.append(vsc_idx) elif cnt.tpe in [DeviceType.GeneratorDevice, DeviceType.BatteryDevice, DeviceType.LoadDevice, DeviceType.StaticGeneratorDevice]: # NOTE: We treat injection active contingencies as full power outages (100% power reduction) bus_idx = injections_bus_index_dict.get(cnt.device_idtag, None) if bus_idx is not None: bus_contingency_indices_list.append(bus_idx) injections_factors_list.append(1.0) # 100% power contingency else: print(f"contingency generator {cnt.device_idtag} not found") else: # search for the contingency in the Branches br_idx = branches_dict.get(cnt.device_idtag, None) if br_idx is not None: branch_contingency_indices_list.append(br_idx) else: print(f"contingency device {cnt.device_type.value}:{cnt.device_idtag} not found") elif cnt.prop == ContingencyOperationTypes.PowerPercentage: # this can only be an injection device bus_idx = injections_bus_index_dict.get(cnt.device_idtag, None) if bus_idx is not None: bus_contingency_indices_list.append(bus_idx) injections_factors_list.append(cnt.value / 100.0) else: print(f"contingency generator {cnt.device_idtag} not found") else: print(f'Unknown branch contingency property {cnt.prop} at {cnt.name} {cnt.idtag}') self.branch_contingency_indices: IntVec = np.array(branch_contingency_indices_list, dtype=int) self.hvdc_contingency_indices: IntVec = np.array(hvdc_contingency_indices_list, dtype=int) self.vsc_contingency_indices: IntVec = np.array(vsc_contingency_indices_list, dtype=int) self.bus_contingency_indices: IntVec = np.array(bus_contingency_indices_list, dtype=int) self.injections_factors: Vec = np.array(injections_factors_list, dtype=float)
[docs] class LinearMultiContingencies: """ LinearMultiContingencies """ def __init__(self, grid: MultiCircuit, contingency_groups_used: List[ContingencyGroup]): """ Constructor :param grid: MultiCircuit """ self.grid: MultiCircuit = grid self.contingency_groups_used = contingency_groups_used # auxiliary structures self.__contingency_group_dict = grid.get_contingency_group_dict() bus_index_dict = grid.get_bus_index_dict() self.__branches_dict = grid.get_branches_index_dict2(add_vsc=False, add_hvdc=False, add_switch=True) self.__hvdc_dict = grid.get_hvdc_index_dict() self.__vsc_dict = grid.get_vsc_index_dict() self.__injections_bus_index_dict = grid.get_injections_bus_index_dict(bus_index_dict=bus_index_dict) self.contingency_indices_list = Vector(len(self.contingency_groups_used)) # for each contingency group for ic, contingency_group in enumerate(self.contingency_groups_used): self.contingency_indices_list[ic] = ContingencyIndices( contingency_group=contingency_group, contingency_group_dict=self.__contingency_group_dict, branches_dict=self.__branches_dict, hvdc_dict=self.__hvdc_dict, vsc_dict=self.__vsc_dict, injections_bus_index_dict=self.__injections_bus_index_dict ) # list of LinearMultiContingency objects that are used later to compute the contingency flows self.multi_contingencies: List[LinearMultiContingency] = list() @property def contingency_group_dict(self) -> Dict[str, List[Contingency]]: """ get the contingency grooups dictionary :return: Dict[str, List[Contingency]] """ return self.__contingency_group_dict
[docs] def get_contingency_group_names(self) -> List[str]: """ Returns a list of the names of the used contingency groups :return: List[str] """ return [elm.name for elm in self.contingency_groups_used]
[docs] def compute(self, lin: LinearAnalysis, ptdf_threshold: float = 0.0001, lodf_threshold: float = 0.0001) -> None: """ Make the LODF with any contingency combination using the declared contingency objects :param lin: LinearAnalysis instance :param ptdf_threshold: threshold to discard values :param lodf_threshold: Threshold for LODF conversion to sparse :return: None """ # lodf: Mat = lin.LODF # ptdf: Mat = lin.PTDF self.multi_contingencies.clear() # for each contingency group for ic, contingency_group in enumerate(self.contingency_groups_used): contingency_indices: ContingencyIndices = self.contingency_indices_list[ic] if len(contingency_indices.branch_contingency_indices) > 1: # Flow = # Pf0[k] # + MLODF[k, bd] * Pf0[bd] # + MLODF[k, bd] * PTDF[bd, i] * dP[i] # + PTDF[k, i] * dPi # Compute M matrix [n, n] (lodf relating the outaged lines to each other) M = create_M_numba(lodf=lin.LODF, branch_contingency_indices=contingency_indices.branch_contingency_indices) L = lin.LODF[:, contingency_indices.branch_contingency_indices] try: # Compute LODF for the multiple failure MLODF[k, Ξ²Ξ΄] mlodf_factors = dense_to_csc(mat=L @ np.linalg.inv(M), threshold=lodf_threshold) except np.linalg.LinAlgError: # Done to capture antenna when computing multiples contingencies mlodf_factors = dense_to_csc(mat=L @ np.linalg.pinv(M), threshold=lodf_threshold) if len(contingency_indices.bus_contingency_indices) > 0: # this is PTDF[k, i] ptdf_k_i = dense_to_csc(mat=lin.PTDF[:, contingency_indices.bus_contingency_indices], threshold=ptdf_threshold) # PTDF[Ξ²Ξ΄, i] ptdf_bd_i = dense_to_csc( mat=lin.PTDF[np.ix_(contingency_indices.branch_contingency_indices, contingency_indices.bus_contingency_indices)], threshold=ptdf_threshold ) else: ptdf_k_i = sp.csc_matrix((lin.PTDF.shape[0], lin.PTDF.shape[1])) # PTDF[Ξ²Ξ΄, i] ptdf_bd_i = dense_to_csc(mat=lin.PTDF[contingency_indices.branch_contingency_indices, :], threshold=ptdf_threshold) # must compute: MLODF[k, Ξ²Ξ΄] x PTDF[Ξ²Ξ΄, i] + PTDF[k, i] compensated_ptdf_factors = mlodf_factors @ ptdf_bd_i + ptdf_k_i elif len(contingency_indices.branch_contingency_indices) == 1: # Pf0[k] # + LODF[k, c] * Pf0[c] # + LODF[k, c] * PTDF[c, i] * dPi # + PTDF[k, i] * dPi # append values mlodf_factors = dense_to_csc(mat=lin.LODF[:, contingency_indices.branch_contingency_indices], threshold=lodf_threshold) if len(contingency_indices.bus_contingency_indices) > 0: # single branch and single bus contingency # this is PTDF[k, i] ptdf_k_i = dense_to_csc(mat=lin.PTDF[:, contingency_indices.bus_contingency_indices], threshold=ptdf_threshold) # PTDF[Ξ²Ξ΄, i] ptdf_bd_i = dense_to_csc( mat=lin.PTDF[np.ix_(contingency_indices.branch_contingency_indices, contingency_indices.bus_contingency_indices)], threshold=ptdf_threshold ) # must compute: MLODF[k, Ξ²Ξ΄] x PTDF[Ξ²Ξ΄, i] + PTDF[k, i] compensated_ptdf_factors = mlodf_factors @ ptdf_bd_i + ptdf_k_i else: # single branch contingency, no bus contingency compensated_ptdf_factors = sp.csc_matrix(([], [], [0]), shape=(lin.LODF.shape[0], 0)) else: mlodf_factors = sp.csc_matrix(([], [], [0]), shape=(lin.LODF.shape[0], 0)) if len(contingency_indices.bus_contingency_indices) > 0: # only bus contingencies compensated_ptdf_factors = dense_to_csc( mat=lin.PTDF[:, contingency_indices.bus_contingency_indices], threshold=ptdf_threshold ) else: # no bus or branch contingencies compensated_ptdf_factors = sp.csc_matrix(([], [], [0]), shape=(lin.LODF.shape[0], 0)) # compute the hvdc and vsc contingency distribution factor matrices hvdc_odf: sp.csc_matrix = dense_to_csc( mat=lin.HvdcODF[:, contingency_indices.hvdc_contingency_indices], threshold=lodf_threshold ) vsc_odf: sp.csc_matrix = dense_to_csc( mat=lin.VscODF[:, contingency_indices.vsc_contingency_indices], threshold=lodf_threshold ) # append values self.multi_contingencies.append( LinearMultiContingency( branch_indices=contingency_indices.branch_contingency_indices, hvdc_indices=contingency_indices.hvdc_contingency_indices, vsc_indices=contingency_indices.vsc_contingency_indices, bus_indices=contingency_indices.bus_contingency_indices, mlodf_factors=mlodf_factors, compensated_ptdf_factors=compensated_ptdf_factors, injections_factor=contingency_indices.injections_factors, hvdc_odf=hvdc_odf, vsc_odf=vsc_odf ) )
[docs] def get_single_con_branch_idx(self) -> Tuple[IntVec, IntVec]: """ Get the branch index array and the contingency group it belongs array :return: array of single contingency branch indices, array of the matching contingency groups """ con_idx = list() cg_idx = list() for i, ci in enumerate(self.contingency_indices_list): if len(ci.branch_contingency_indices) == 1: con_idx.append(ci.branch_contingency_indices[0]) cg_idx.append(i) return np.array(con_idx), np.array(cg_idx)
[docs] class LinearAnalysisTs: """ Class to compute the different linear states of a grid """ def __init__(self, grid: MultiCircuit, distributed_slack: bool = False, correct_values: bool = False, time_indices: IntVec | None = None, contingency_groups_used: List[ContingencyGroup] | None = None, ptdf_threshold: float = 1e-4, lodf_threshold: float = 1e-4, compute_multi_contingencies: bool = True): """ Constructor :param grid: MultiCircuit instance :param distributed_slack: boolean to distribute slack :param correct_values: boolean to fix out layer values :param time_indices: Array of time indices :param contingency_groups_used: list of contingency groups to use (all the grid's if None) :param ptdf_threshold: threshold for PTDF's to be converted to sparse :param lodf_threshold: threshold for LODF's to be converted to sparse :param compute_multi_contingencies: also pre-compute the LinearMultiContingencies """ if not grid.has_time_series: raise Exception("The grid does not have any time series :(") self.time_indices = grid.get_all_time_indices() if time_indices is None else time_indices self._inverse_time_index = {t: i for i, t in enumerate(self.time_indices)} # get the states matrix mat: IntMat = grid.get_branch_active_time_array()[self.time_indices, :] # analyze how many PTDF's we need to get self.groups, self.mapping = find_different_states(mat) self._linear_analysis: Dict[int, LinearAnalysis] = dict() self._linear_multi_contingencies: Dict[int, LinearMultiContingencies] = dict() # compile the linear analysis for the different time steps for t_idx, list_of_represented_time_steps in self.groups.items(): # Compile the snapshot nc = compile_numerical_circuit_at(circuit=grid, t_idx=t_idx) # Linear analysis (PTDF & LODF) lin = LinearAnalysis(nc=nc, distributed_slack=distributed_slack, correct_values=correct_values) self._linear_analysis[t_idx] = lin if compute_multi_contingencies: linear_multiple_contingencies = LinearMultiContingencies( grid=grid, contingency_groups_used=contingency_groups_used if contingency_groups_used is not None else grid.contingency_groups ) linear_multiple_contingencies.compute( lin=lin, ptdf_threshold=ptdf_threshold, lodf_threshold=lodf_threshold ) self._linear_multi_contingencies[t_idx] = linear_multiple_contingencies self.nbr = grid.get_branch_number() self.nbus = grid.get_bus_number() self.nt = len(self.time_indices)
[docs] def get_linear_analysis_at(self, t_idx: int) -> LinearAnalysis: """ :param t_idx: time index in the general schema :return: """ # get the base index (i.e. when clustering) it = self._inverse_time_index[t_idx] # get the mapped reduced index t_i = self.mapping[it] # get the linear analysis lin: LinearAnalysis = self._linear_analysis[t_i] return lin
[docs] def get_multiple_contingencies_at(self, t_idx: int) -> LinearMultiContingencies: """ :param t_idx: :return: """ # get the base index (i.e. when clustering) it = self._inverse_time_index[t_idx] # get the mapped reduced index t_i = self.mapping[it] # get the linear analysis lin: LinearMultiContingencies = self._linear_multi_contingencies[t_i] return lin
[docs] def get_flows_at(self, t_idx: int, P: CxVec | Vec) -> CxVec | Vec: """ Get the flows at a time step :param t_idx: Time index :param P: Injections vector :return: branch flows vector """ # get the linear analysis lin = self.get_linear_analysis_at(t_idx=t_idx) return lin.get_flows(Sbus=P)
[docs] def get_branch_flow_ts(self, branch_idx: int, bus_idx: int, P: CxVec | Vec) -> CxVec | Vec: """ Get the flow time series of a single branch given the injection time series of a single bus :param branch_idx: Branch index :param bus_idx: Bus index :param P: Bus injection time series :return: Branch flow time series """ # must have the same size assert len(P) == self.nt flow_ts: Vec = np.zeros_like(P) for t_idx, list_of_represented_time_steps in self.groups.items(): # get the linear analysis lin: LinearAnalysis = self._linear_analysis[t_idx] flow_ts[list_of_represented_time_steps] = lin.PTDF[branch_idx, bus_idx] * P[list_of_represented_time_steps] return flow_ts
[docs] def get_flows_ts(self, P: CxMat | Mat, progress_func=None, progress_text=None) -> CxMat | Mat: """ Get the flow time series of all branches given the injection time series all buses :param P: Bus injection time series :param progress_func: Progres function :param progress_text: Progress text function :return: Branches flow time series """ # must have the same size assert P.shape[0] == self.nt assert P.shape[1] == self.nbus flow_ts = np.zeros((self.nt, self.nbr)) ii = 0 nn = len(self.groups) for t_idx, list_of_represented_time_steps in self.groups.items(): if progress_text is not None: progress_text(f"Computing flows for time group {t_idx}") # get the linear analysis lin: LinearAnalysis = self._linear_analysis[t_idx] flow_ts[list_of_represented_time_steps, :] = lin.get_flows2d(Sbus=P[list_of_represented_time_steps, :]) if progress_func is not None: ii += 1 progress_func(ii / nn * 100.0) return flow_ts
[docs] def get_reverse_injections_ts(self, flows_ts: CxMat | Mat) -> CxMat | Mat: """ Get the flow time series of all branches given the injection time series all buses :param flows_ts: Branches flow time series :return: Bus injection time series """ # must have the same size assert flows_ts.shape[0] == self.nt assert flows_ts.shape[1] == self.nbr Pbus = np.zeros((self.nt, self.nbus)) for t_idx, list_of_represented_time_steps in self.groups.items(): # get the linear analysis lin: LinearAnalysis = self._linear_analysis[t_idx] Pbus[list_of_represented_time_steps, :] = lin.get_reverse_injections_2d( flows=flows_ts[list_of_represented_time_steps, :] ) return Pbus
[docs] def get_corrected_injections_ts(self, P_ts: CxMat | Mat) -> CxMat | Mat: """ Get the corrected injections time series :param P_ts: Time series injections :return: Bus injection time series """ # must have the same size assert P_ts.shape[0] == self.nt assert P_ts.shape[1] == self.nbus Pbus = np.zeros((self.nt, self.nbus)) for t_idx, list_of_represented_time_steps in self.groups.items(): # get the linear analysis lin: LinearAnalysis = self._linear_analysis[t_idx] Pbus[list_of_represented_time_steps, :] = lin.get_corrected_injections2d( P=P_ts[list_of_represented_time_steps, :] ) return Pbus