# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import numpy as np
import numba as nb
import scipy.sparse as sp
from typing import Union, Tuple, List
from VeraGridEngine.enumerations import WindingsConnection
from VeraGridEngine.basic_structures import ObjVec, Vec, CxVec, IntVec
[docs]
def csc_equal(A: sp.csc_matrix,
B: sp.csc_matrix,
tol: float = 0.0) -> bool:
"""
Return True iff two CSC matrices are equal
(up-to a tolerance for floatingβpoint data).
Parameters
----------
A, B : scipy.sparse.csc_matrix
The matrices to compare.
tol : float, optional
Absolute tolerance. If 0.0 an exact match is required
otherwise the test is |A-B| > tol element-wise.
Notes
-----
* Both matrices are sorted first (`sort_indices`) so the
result is independent of internal index ordering.
* Works for any SciPy sparse subtype (CSR, COO β¦) after
`.tocsc()`.
"""
# 1. quick rejections
if A.shape != B.shape or A.dtype != B.dtype:
return False
# 2. canonicalise index ordering
A = A.copy()
A.sort_indices()
B = B.copy()
B.sort_indices()
# 3. exact or approximate test
if tol == 0.0:
return (A != B).nnz == 0 # :contentReference[oaicite:0]{index=0}
else:
return (abs(A - B) > tol).nnz == 0 # :contentReference[oaicite:1]{index=1}
@nb.njit(cache=True)
def _prepare_branch_maps(nbus: int, nbranch: int, F: IntVec, T: IntVec,
Yf_indices: IntVec, Yf_indptr: IntVec,
Ybus_indices: IntVec, Ybus_indptr: IntVec):
"""
Build a map to the matrices to update (Ybus, Yf, Yt)
Preconditions:
* Yf / Yt were built with the 'row == branch index' pattern.
* Yf and Yt share the same indices / indptr (as produced earlier).
* Ybus contains exactly one entry per (row,col) pair.
:param nbus: number of buses
:param F: Array of from indices
:param T: Array of to indices
:param Yf_indices: CSC indices of Yf
:param Yf_indptr: CSC index pointers of Yf
:param Ybus_indices: CSC indices of Ybus
:param Ybus_indptr: CSC index pointers of Ybus
:return: pos_yff, pos_yft, pos_ytf, pos_ytt (nbranch,) int32
pos_b_ii, pos_b_ij, pos_b_ji, pos_b_jj (nbranch,) int32
"""
pos_yff = np.empty(nbranch, np.int32)
pos_yft = np.empty(nbranch, np.int32)
pos_ytf = np.empty(nbranch, np.int32)
pos_ytt = np.empty(nbranch, np.int32)
# ------- locate branch rows in Yf / Yt --------------------------
for col in range(nbus):
start = Yf_indptr[col]
end = Yf_indptr[col + 1]
for p in range(start, end):
br = Yf_indices[p] # row == branch
if col == F[br]:
pos_yff[br] = p # Yf (k,fbus)
pos_ytf[br] = p # Yt (k,fbus)
else: # must be to-bus
pos_yft[br] = p # Yf (k,tbus)
pos_ytt[br] = p # Yt (k,tbus)
# ------- build (row,col) β position map for Ybus ---------------
# key = col*nbus + row (fits in int64 for any realistic grid)
pos_bus = dict()
for col in range(nbus):
start = Ybus_indptr[col]
end = Ybus_indptr[col + 1]
for p in range(start, end):
row = Ybus_indices[p]
pos_bus[col * nbus + row] = p
pos_b_ii = np.empty(nbranch, np.int32)
pos_b_ij = np.empty(nbranch, np.int32)
pos_b_ji = np.empty(nbranch, np.int32)
pos_b_jj = np.empty(nbranch, np.int32)
for k in range(nbranch):
i = F[k]
j = T[k]
pos_b_ii[k] = pos_bus[i * nbus + i] # self @ from-bus
pos_b_jj[k] = pos_bus[j * nbus + j] # self @ to-bus
pos_b_ij[k] = pos_bus[j * nbus + i] # mutual j,i (note: column = j)
pos_b_ji[k] = pos_bus[i * nbus + j] # mutual i,j
return pos_yff, pos_yft, pos_ytf, pos_ytt, pos_b_ii, pos_b_ij, pos_b_ji, pos_b_jj
[docs]
@nb.njit(cache=True)
def update_branch_admittances(idx: IntVec,
new_yff: CxVec, new_yft: CxVec, new_ytf: CxVec, new_ytt: CxVec,
Yf_data: CxVec, Yt_data: CxVec, Ybus_data: CxVec,
pos_yff: IntVec, pos_yft: IntVec, pos_ytf: IntVec, pos_ytt: IntVec,
pos_b_ii: IntVec, pos_b_ij: IntVec, pos_b_ji: IntVec, pos_b_jj: IntVec):
"""
Update Yf, Yt, Ybus *in place*. All arrays are pre-allocated.
:param idx: branches you change
:param new_yff:
:param new_yft:
:param new_ytf:
:param new_ytt:
:param Yf_data:
:param Yt_data:
:param Ybus_data:
:param pos_yff:
:param pos_yft:
:param pos_ytf:
:param pos_ytt:
:param pos_b_ii:
:param pos_b_ij:
:param pos_b_ji:
:param pos_b_jj:
:return:
"""
for k_idx, k in enumerate(idx):
# ---- Yf ----------------------------------------------------
p_ff = pos_yff[k]
p_ft = pos_yft[k]
d_ff = new_yff[k_idx] - Yf_data[p_ff]
d_ft = new_yft[k_idx] - Yf_data[p_ft]
Yf_data[p_ff] = new_yff[k_idx]
Yf_data[p_ft] = new_yft[k_idx]
# ---- Yt ----------------------------------------------------
p_tf = pos_ytf[k]
p_tt = pos_ytt[k]
d_tf = new_ytf[k_idx] - Yt_data[p_tf]
d_tt = new_ytt[k_idx] - Yt_data[p_tt]
Yt_data[p_tf] = new_ytf[k_idx]
Yt_data[p_tt] = new_ytt[k_idx]
# ---- Ybus (add deltas) ------------------------------------
Ybus_data[pos_b_ii[k]] += d_ff
Ybus_data[pos_b_jj[k]] += d_tt
Ybus_data[pos_b_ji[k]] += d_ff * 0 # placeholder if needed
Ybus_data[pos_b_ji[k]] += d_tf
Ybus_data[pos_b_ij[k]] += d_ft
[docs]
class AdmittanceMatrices:
"""
Class to store admittance matrices
"""
__slots__ = (
"Ybus",
"Yf",
"Yt",
"Cf",
"Ct",
"yff",
"yft",
"ytf",
"ytt",
"Yshunt_bus",
)
def __init__(self,
Ybus: sp.csc_matrix,
Yf: sp.csc_matrix,
Yt: sp.csc_matrix,
Cf: sp.csc_matrix,
Ct: sp.csc_matrix,
yff: CxVec,
yft: CxVec,
ytf: CxVec,
ytt: CxVec,
Yshunt_bus: CxVec):
"""
Constructor
:param Ybus: Admittance matrix
:param Yf: Admittance matrix of the branches with their "from" bus
:param Yt: Admittance matrix of the branches with their "to" bus
:param Cf: Connectivity matrix of the branches with their "from" bus
:param Ct: Connectivity matrix of the branches with their "to" bus
:param yff: admittance from-from primitives vector
:param yft: admittance from-to primitives vector
:param ytf: admittance to-from primitives vector
:param ytt: admittance to-to primitives vector
:param Yshunt_bus: array of shunt admittances per bus
"""
self.Ybus = Ybus if Ybus.format == 'csc' else Ybus.tocsc()
self.Yf = Yf if Yf.format == 'csc' else Yf.tocsc()
self.Yt = Yt if Yt.format == 'csc' else Yt.tocsc()
self.Cf = Cf if Cf.format == 'csc' else Cf.tocsc()
self.Ct = Ct if Ct.format == 'csc' else Ct.tocsc()
self.yff = yff
self.yft = yft
self.ytf = ytf
self.ytt = ytt
self.Yshunt_bus = Yshunt_bus
[docs]
def modify_taps_all(self,
m: Vec, m2: Vec,
tau: Vec, tau2: Vec) -> Tuple[sp.csc_matrix, sp.csc_matrix, sp.csc_matrix]:
"""
Compute the new admittance matrix given the tap variation
:param m: previous tap module (nbr)
:param m2: new tap module (nbr)
:param tau: previous tap angle (nbr)
:param tau2: new tap angle (nbr)
:return: Ybus, Yf, Yt
"""
# update all primitives
self.yff = (self.yff * (m * m) / (m2 * m2))
self.yft = self.yft * (m * np.exp(-1.0j * tau)) / (m2 * np.exp(-1.0j * tau2))
self.ytf = self.ytf * (m * np.exp(1.0j * tau)) / (m2 * np.exp(1.0j * tau2))
self.ytt = self.ytt
# update the matrices
self.Yf = (sp.diags(self.yff) * self.Cf + sp.diags(self.yft) * self.Ct).tocsc()
self.Yt = (sp.diags(self.ytf) * self.Cf + sp.diags(self.ytt) * self.Ct).tocsc()
self.Ybus = (self.Cf.T * self.Yf + self.Ct.T * self.Yt + sp.diags(self.Yshunt_bus)).tocsc()
return self.Ybus, self.Yf, self.Yt
[docs]
def modify_taps(self,
m_prev: Vec, m_new: Vec,
tau_prev: Vec, tau_new: Vec,
idx: IntVec) -> Tuple[sp.csc_matrix, sp.csc_matrix, sp.csc_matrix]:
"""
Compute the new admittance matrix given the tap variation
:param m_prev: previous tap module (length of idx)
:param m_new: new tap module (length of idx)
:param tau_prev: previous tap angle (length of idx)
:param tau_new: new tap angle (length of idx)
:param idx: branch indices that modify either m or tau.
There has to be a single indices array because it is
very hard to maintain indices that apply only to m,
indices that apply only to tau and indices that apply to both
:return: Ybus, Yf, Yt
"""
# add arrays must have the same size as idx
lidx = len(idx)
assert len(m_prev) == lidx
assert len(m_new) == lidx
assert len(tau_prev) == lidx
assert len(tau_new) == lidx
# update primitives signaled by idx
self.yff[idx] = (self.yff[idx] * (m_prev * m_prev) / (m_new * m_new))
self.yft[idx] = self.yft[idx] * (m_prev * np.exp(-1.0j * tau_prev)) / (m_new * np.exp(-1.0j * tau_new))
self.ytf[idx] = self.ytf[idx] * (m_prev * np.exp(1.0j * tau_prev)) / (m_new * np.exp(1.0j * tau_new))
self.ytt[idx] = self.ytt[idx]
# update the matrices
self.Yf = (sp.diags(self.yff) * self.Cf + sp.diags(self.yft) * self.Ct).tocsc()
self.Yt = (sp.diags(self.ytf) * self.Cf + sp.diags(self.ytt) * self.Ct).tocsc()
self.Ybus = (self.Cf.T * self.Yf + self.Ct.T * self.Yt + sp.diags(self.Yshunt_bus)).tocsc()
return self.Ybus, self.Yf, self.Yt
[docs]
def copy(self) -> "AdmittanceMatrices":
"""
Get a deep copy
"""
return AdmittanceMatrices(Ybus=self.Ybus.copy(),
Yf=self.Yf.copy(),
Yt=self.Yt.copy(),
Cf=self.Cf.copy(),
Ct=self.Ct.copy(),
yff=self.yff.copy(),
yft=self.yft.copy(),
ytf=self.ytf.copy(),
ytt=self.ytt.copy(),
Yshunt_bus=self.Yshunt_bus.copy())
def __eq__(self, other: "AdmittanceMatrices"):
ok = True
ok = ok and np.isclose(self.yff, other.yff).all()
ok = ok and np.isclose(self.yft, other.yft).all()
ok = ok and np.isclose(self.ytf, other.ytf).all()
ok = ok and np.isclose(self.ytt, other.ytt).all()
ok = ok and csc_equal(self.Ybus, other.Ybus, tol=1e-10)
ok = ok and csc_equal(self.Yf, other.Yf, tol=1e-10)
ok = ok and csc_equal(self.Yt, other.Yt, tol=1e-10)
return ok
[docs]
def compute_admittances(R: Vec,
X: Vec,
G: Vec,
B: Vec,
tap_module: Vec,
vtap_f: Vec,
vtap_t: Vec,
tap_angle: Vec,
Cf: sp.csc_matrix,
Ct: sp.csc_matrix,
Yshunt_bus: CxVec,
conn: IntVec,
seq: int,
add_windings_phase: bool = False) -> AdmittanceMatrices:
"""
Compute the complete admittance matrices for the general power flow methods (Newton-Raphson based)
:param R: array of branch resistance (p.u.)
:param X: array of branch reactance (p.u.)
:param G: array of branch conductance (p.u.)
:param B: array of branch susceptance (p.u.)
:param tap_module: array of tap modules (for all Branches, regardless of their type)
:param vtap_f: array of virtual taps at the "from" side
:param vtap_t: array of virtual taps at the "to" side
:param tap_angle: array of tap angles (for all Branches, regardless of their type)
:param Cf: Connectivity branch-bus "from" with the branch states computed
:param Ct: Connectivity branch-bus "to" with the branch states computed
:param Yshunt_bus: array of shunts equivalent power per bus, from the shunt devices (p.u.)
:param seq: Sequence [0, 1, 2]
:param conn: array of windings connections codes (numpy array of WindingsConnection values)
:param add_windings_phase: Add the phases of the transformer windings (for short circuits mainly)
:return: Admittance instance
"""
# form the admittance matrices
ys = 1.0 / (R + 1.0j * (X + 1e-20)) # series admittance
ysh_2 = (G + 1j * B) / 2.0 # shunt admittance
# compose the primitives
if add_windings_phase:
r30_deg = np.exp(1.0j * np.pi / 6.0)
if seq == 0: # zero sequence
# add always the shunt term, the series depends on the connection
# one ys vector for the from side, another for the to side, and the shared one
ysf = np.zeros(len(ys), dtype=complex)
yst = np.zeros(len(ys), dtype=complex)
ysft = np.zeros(len(ys), dtype=complex)
for i, con in enumerate(conn):
if con == WindingsConnection.GG.idx():
ysf[i] = ys[i]
yst[i] = ys[i]
ysft[i] = ys[i]
elif con == WindingsConnection.GD.idx():
ysf[i] = ys[i]
yff = (ysf + ysh_2) / (tap_module * tap_module * vtap_f * vtap_f)
yft = -ysft / (tap_module * np.exp(-1.0j * tap_angle) * vtap_f * vtap_t)
ytf = -ysft / (tap_module * np.exp(+1.0j * tap_angle) * vtap_t * vtap_f)
ytt = (yst + ysh_2) / (vtap_t * vtap_t)
elif seq == 2: # negative sequence
# only need to include the phase shift of +-30 degrees
factor_psh = np.array([r30_deg
if con == WindingsConnection.GD.idx() or con == WindingsConnection.SD.idx() else 1
for con in conn])
yff = (ys + ysh_2) / (tap_module * tap_module * vtap_f * vtap_f)
yft = -ys / (tap_module * np.exp(+1.0j * tap_angle) * vtap_f * vtap_t) * np.conj(factor_psh)
ytf = -ys / (tap_module * np.exp(-1.0j * tap_angle) * vtap_t * vtap_f) * factor_psh
ytt = (ys + ysh_2) / (vtap_t * vtap_t)
elif seq == 1: # positive sequence
# only need to include the phase shift of +-30 degrees
factor_psh = np.array([r30_deg
if con == WindingsConnection.GD.idx() or con == WindingsConnection.SD.idx() else 1.0
for con in conn])
yff = (ys + ysh_2) / (tap_module * tap_module * vtap_f * vtap_f)
yft = -ys / (tap_module * np.exp(-1.0j * tap_angle) * vtap_f * vtap_t) * factor_psh
ytf = -ys / (tap_module * np.exp(1.0j * tap_angle) * vtap_t * vtap_f) * np.conj(factor_psh)
ytt = (ys + ysh_2) / (vtap_t * vtap_t)
else:
raise Exception('Unsupported sequence when computing the admittance matrix sequence={}'.format(seq))
else: # original
yff = (ys + ysh_2) / (tap_module * tap_module * vtap_f * vtap_f)
yft = -ys / (tap_module * np.exp(-1.0j * tap_angle) * vtap_f * vtap_t)
ytf = -ys / (tap_module * np.exp(1.0j * tap_angle) * vtap_t * vtap_f)
ytt = (ys + ysh_2) / (vtap_t * vtap_t)
# compose the matrices
Yf = sp.diags(yff) * Cf + sp.diags(yft) * Ct
Yt = sp.diags(ytf) * Cf + sp.diags(ytt) * Ct
Ybus = Cf.T * Yf + Ct.T * Yt + sp.diags(Yshunt_bus)
return AdmittanceMatrices(Ybus.tocsc(), Yf.tocsc(), Yt.tocsc(), Cf.tocsc(), Ct.tocsc(),
yff, yft, ytf, ytt, Yshunt_bus)
@nb.njit(cache=True, inline="always")
def _sum_in_place(arr):
"""
exclusive prefix-sum in-place
:param arr: some array, it is modified in-place
:return: total of arr
"""
s = 0
for i in range(arr.size):
tmp = arr[i]
arr[i] = s
s += tmp
return s
@nb.njit(cache=True)
def _build_Yf_Yt(nbus, nbr: int, F: IntVec, T: IntVec, yff: CxVec, yft: CxVec, ytf: CxVec, ytt: CxVec):
"""
branch matrices (identical pattern β share indices/indptr)
:param nbus:
:param nbr:
:param F:
:param T:
:param yff:
:param yft:
:param ytf:
:param ytt:
:return:
"""
# 1. count nnz per column
nnz_col = np.zeros(nbus, np.int64)
for k in range(nbr):
nnz_col[F[k]] += 1
nnz_col[T[k]] += 1
# 2. build indptr (length nb+1!)
indptr = np.empty(nbus + 1, np.int64)
indptr[0] = 0
for j in range(nbus):
indptr[j + 1] = indptr[j] + nnz_col[j]
nnz = indptr[-1]
# 3. allocate arrays
indices = np.empty(nnz, np.int32)
data_F = np.empty(nnz, np.complex128)
data_T = np.empty(nnz, np.complex128)
# 4. cursors that advance inside each column
head = indptr[:-1].copy() # length nb, one cursor per column
for k in range(nbr):
i = F[k]
j = T[k]
p = head[i]
indices[p] = k
data_F[p] = yff[k]
data_T[p] = ytf[k]
head[i] += 1
p = head[j]
indices[p] = k
data_F[p] = yft[k]
data_T[p] = ytt[k]
head[j] += 1
return data_F, data_T, indices, indptr # <- length nb+1
@nb.njit(cache=True)
def _build_Ybus(nbus: int, nbr: int, F: IntVec, T: IntVec,
yff: CxVec, yft: CxVec, ytf: CxVec, ytt: CxVec, Ysh: CxVec) -> Tuple[CxVec, IntVec, IntVec]:
"""
Build Ybus
:param nbus:
:param nbr:
:param F:
:param T:
:param yff:
:param yft:
:param ytf:
:param ytt:
:param Ysh:
:return: (data, indices, indptr)
"""
raw_nnz = 4 * nbr + nbus # i-i, i-j, j-i, j-j + shunts
rows_raw = np.empty(raw_nnz, np.int32)
cols_raw = np.empty(raw_nnz, np.int32)
data_raw = np.empty(raw_nnz, np.complex128)
p = 0
for k in range(nbr):
i = F[k]
j = T[k]
# self admittances
rows_raw[p] = i
cols_raw[p] = i
data_raw[p] = yff[k]
p += 1
rows_raw[p] = j
cols_raw[p] = j
data_raw[p] = ytt[k]
p += 1
# mutuals
rows_raw[p] = i
cols_raw[p] = j
data_raw[p] = yft[k]
p += 1
rows_raw[p] = j
cols_raw[p] = i
data_raw[p] = ytf[k]
p += 1
# shunts
for b in range(nbus):
rows_raw[p] = b
cols_raw[p] = b
data_raw[p] = Ysh[b]
p += 1
# ---------- sort (col,row) ---------------------------------------
key = cols_raw * nbus + rows_raw
order = np.argsort(key)
rows_s = rows_raw[order]
cols_s = cols_raw[order]
data_s = data_raw[order]
# ---------- merge duplicates & count per column ------------------
data = np.empty(raw_nnz, np.complex128)
indices = np.empty(raw_nnz, np.int32)
indptr = np.zeros(nbus + 1, np.int64) # counts per column
last_col = -1
last_row = -1
w = 0 # write cursor
for idx in range(raw_nnz):
r = rows_s[idx]
c = cols_s[idx]
v = data_s[idx]
if c != last_col: # new column
last_col = c
last_row = -1
if r == last_row: # same (c,r) β accumulate
data[w - 1] += v
else: # new entry
last_row = r
data[w] = v
indices[w] = r
indptr[c] += 1
w += 1
# ---------- convert counts β pointers ----------------------------
_sum_in_place(indptr) # in-place prefix sum
indptr[-1] = w # set final nnz pointer
# slice to size w
data = data[:w]
indices = indices[:w]
return data, indices, indptr
[docs]
class AdmittanceMatricesFast:
"""
Class to store admittance matrices
"""
__slots__ = (
"Ybus",
"Yf",
"Yt",
"F",
"T",
"ys",
"ysh2",
"vtap_f",
"vtap_t",
"yff",
"yft",
"ytf",
"ytt",
"Yshunt_bus",
"pos_yff",
"pos_yft",
"pos_ytf",
"pos_ytt",
"pos_b_ii",
"pos_b_ij",
"pos_b_ji",
"pos_b_jj",
)
def __init__(self,
Ybus: sp.csc_matrix,
Yf: sp.csc_matrix,
Yt: sp.csc_matrix,
F: IntVec,
T: IntVec,
ys: CxVec,
ysh2: CxVec,
vtap_f: Vec,
vtap_t: Vec,
yff: CxVec,
yft: CxVec,
ytf: CxVec,
ytt: CxVec,
Yshunt_bus: CxVec):
"""
Constructor
:param Ybus: Admittance matrix
:param Yf: Admittance matrix of the branches with their "from" bus
:param Yt: Admittance matrix of the branches with their "to" bus
:param F: Branches array of "from" bus
:param T: CBranches array of "to" bus
:param ys: series admittance {ys = 1.0 / (R + 1.0j * (X + 1e-20))}
:param ysh2: shunt admittance {ysh_2 = (G + 1j * B) / 2.0}
:param vtap_f: array of from virtual taps
:param vtap_t: array of to virtual taps
:param yff: admittance from-from primitives vector
:param yft: admittance from-to primitives vector
:param ytf: admittance to-from primitives vector
:param ytt: admittance to-to primitives vector
:param Yshunt_bus: array of shunt admittances per bus
"""
self.Ybus = Ybus if Ybus.format == 'csc' else Ybus.tocsc()
self.Yf = Yf if Yf.format == 'csc' else Yf.tocsc()
self.Yt = Yt if Yt.format == 'csc' else Yt.tocsc()
self.F = F
self.T = T
self.ys = ys # ys = 1.0 / (R + 1.0j * (X + 1e-20)) #
self.ysh2 = ysh2 # ysh_2 = (G + 1j * B) / 2.0 # shunt admittance
self.vtap_f = vtap_f
self.vtap_t = vtap_t
self.yff = yff
self.yft = yft
self.ytf = ytf
self.ytt = ytt
self.Yshunt_bus = Yshunt_bus
self.pos_yff = np.zeros(0, dtype=int)
self.pos_yft = np.zeros(0, dtype=int)
self.pos_ytf = np.zeros(0, dtype=int)
self.pos_ytt = np.zeros(0, dtype=int)
self.pos_b_ii = np.zeros(0, dtype=int)
self.pos_b_ij = np.zeros(0, dtype=int)
self.pos_b_ji = np.zeros(0, dtype=int)
self.pos_b_jj = np.zeros(0, dtype=int)
[docs]
def initialize_update(self):
"""
Build the indices to later update the matrix easily
:return:
"""
(self.pos_yff,
self.pos_yft,
self.pos_ytf,
self.pos_ytt,
self.pos_b_ii,
self.pos_b_ij,
self.pos_b_ji,
self.pos_b_jj) = _prepare_branch_maps(nbus=self.Ybus.shape[0],
nbranch=len(self.F),
F=self.F, T=self.T,
Yf_indices=self.Yf.indices,
Yf_indptr=self.Yf.indptr,
Ybus_indices=self.Ybus.indices,
Ybus_indptr=self.Ybus.indptr)
[docs]
def modify_taps_fast(self, idx, tap_module: Vec, tap_angle: Vec) -> None:
"""
Modify in-place Ybus, Yf and Yt
:param idx: indices of the branches to modify. Both the tap angle and module are updated for every index.
:param tap_module: Tap modules of the positions given by idx
:param tap_angle: Tap angles of the positions given by idx
"""
if len(self.pos_yff) == 0 and self.yff != 0:
self.initialize_update() # your forgot to initialize...
mf = self.vtap_f[idx]
mt = self.vtap_t[idx]
new_yff = (self.ys[idx] + self.ysh2[idx]) / (tap_module * tap_module * mf * mf)
new_yft = -self.ys[idx] / (tap_module * np.exp(-1.0j * tap_angle) * mf * mt)
new_ytf = -self.ys[idx] / (tap_module * np.exp(1.0j * tap_angle) * mt * mf)
new_ytt = (self.ys[idx] + self.ysh2[idx]) / (mt * mt)
# update the primitives
self.yff[idx] = new_yff
self.yft[idx] = new_yft
self.ytf[idx] = new_ytf
self.ytt[idx] = new_ytt
# Update in-place
update_branch_admittances(
idx=idx,
new_yff=new_yff,
new_yft=new_yft,
new_ytf=new_ytf,
new_ytt=new_ytt,
Yf_data=self.Yf.data,
Yt_data=self.Yt.data,
Ybus_data=self.Ybus.data,
pos_yff=self.pos_yff,
pos_yft=self.pos_yft,
pos_ytf=self.pos_ytf,
pos_ytt=self.pos_ytt,
pos_b_ii=self.pos_b_ii,
pos_b_ij=self.pos_b_ij,
pos_b_ji=self.pos_b_ji,
pos_b_jj=self.pos_b_jj
)
[docs]
def copy(self) -> "AdmittanceMatricesFast":
"""
Get a deep copy
"""
res = AdmittanceMatricesFast(Ybus=self.Ybus.copy(),
Yf=self.Yf.copy(),
Yt=self.Yt.copy(),
F=self.F.copy(),
T=self.T.copy(),
ys=self.ys.copy(),
ysh2=self.ysh2.copy(),
vtap_f=self.vtap_f.copy(),
vtap_t=self.vtap_t.copy(),
yff=self.yff.copy(),
yft=self.yft.copy(),
ytf=self.ytf.copy(),
ytt=self.ytt.copy(),
Yshunt_bus=self.Yshunt_bus.copy())
res.pos_yff = self.pos_yff.copy()
res.pos_yft = self.pos_yft.copy()
res.pos_ytf = self.pos_ytf.copy()
res.pos_ytt = self.pos_ytt.copy()
res.pos_b_ii = self.pos_b_ii.copy()
res.pos_b_ij = self.pos_b_ij.copy()
res.pos_b_ji = self.pos_b_ji.copy()
res.pos_b_jj = self.pos_b_jj.copy()
return res
def __eq__(self, other: "AdmittanceMatricesFast"):
ok = True
ok = ok and csc_equal(self.Ybus, other.Ybus, tol=1e-10)
ok = ok and csc_equal(self.Yf, other.Yf, tol=1e-10)
ok = ok and csc_equal(self.Yt, other.Yt, tol=1e-10)
return ok
[docs]
def compute_admittances_fast(nbus,
R: Vec,
X: Vec,
G: Vec,
B: Vec,
tap_module: Vec,
vtap_f: Vec,
vtap_t: Vec,
tap_angle: Vec,
Yshunt_bus: CxVec,
F: IntVec,
T: IntVec) -> AdmittanceMatricesFast:
"""
Hardcore build of admittance matrices
:param nbus: number of nodes
:param R: array of branch resistance (p.u.)
:param X: array of branch reactance (p.u.)
:param G: array of branch conductance (p.u.)
:param B: array of branch susceptance (p.u.)
:param tap_module: array of tap modules (for all Branches, regardless of their type)
:param vtap_f: array of virtual taps at the "from" side
:param vtap_t: array of virtual taps at the "to" side
:param tap_angle: array of tap angles (for all Branches, regardless of their type)
:param Yshunt_bus: array of shunts equivalent power per bus, from the shunt devices (p.u.)
:param F: Array of branch-from bus indices
:param T: Array of branch-to bus indices
:param Cf: Cf to pass along
:param Ct: Ct to pass along
:return: Yf, Yt, Ybus
"""
ys = 1.0 / (R + 1.0j * (X + 1e-20)) # series admittance
ysh_2 = (G + 1j * B) / 2.0 # shunt admittance
yff = (ys + ysh_2) / (tap_module * tap_module * vtap_f * vtap_f)
yft = -ys / (tap_module * np.exp(-1.0j * tap_angle) * vtap_f * vtap_t)
ytf = -ys / (tap_module * np.exp(1.0j * tap_angle) * vtap_t * vtap_f)
ytt = (ys + ysh_2) / (vtap_t * vtap_t)
# ---------- branch matrices --------------------------------------
nbr = len(F)
data_F, data_T, idx_FT, ptr_FT = _build_Yf_Yt(nbus, nbr, F, T, yff, yft, ytf, ytt)
Yf = sp.csc_matrix((data_F, idx_FT, ptr_FT), shape=(nbr, nbus))
Yt = sp.csc_matrix((data_T, idx_FT, ptr_FT), shape=(nbr, nbus))
# ---------- bus matrix -------------------------------------------
data_B, idx_B, ptr_B = _build_Ybus(nbus, nbr, F, T, yff, yft, ytf, ytt, Yshunt_bus)
Ybus = sp.csc_matrix((data_B, idx_B, ptr_B), shape=(nbus, nbus))
return AdmittanceMatricesFast(Ybus=Ybus.tocsc(),
Yf=Yf.tocsc(),
Yt=Yt.tocsc(),
F=F,
T=T,
ys=ys,
ysh2=ysh_2,
vtap_f=vtap_f,
vtap_t=vtap_t,
yff=yff,
yft=yft,
ytf=ytf,
ytt=ytt,
Yshunt_bus=Yshunt_bus)
[docs]
class SeriesAdmittanceMatrices:
"""
Admittance matrices for HELM and the AC linear methods
"""
__slots__ = (
"Yseries",
"Yshunt",
)
def __init__(self, Yseries: sp.csc_matrix, Yshunt: CxVec):
self.Yseries = Yseries
self.Yshunt = Yshunt
[docs]
def compute_split_admittances(R: Vec,
X: Vec,
G: Vec,
B: Vec,
active: IntVec,
tap_module: Vec,
vtap_f: Vec,
vtap_t: Vec,
tap_angle: Vec,
Cf: sp.csc_matrix,
Ct: sp.csc_matrix,
Yshunt_bus: CxVec) -> SeriesAdmittanceMatrices:
"""
Compute the complete admittance matrices for the helm method and others that may require them
:param R: array of branch resistance (p.u.)
:param X: array of branch reactance (p.u.)
:param G: array of branch conductance (p.u.)
:param B: array of branch susceptance (p.u.)
:param active: array of active branches (bool)
:param tap_module: array of tap modules (for all Branches, regardless of their type)
:param vtap_f: array of virtual taps at the "from" side
:param vtap_t: array of virtual taps at the "to" side
:param tap_angle: array of tap angles (for all Branches, regardless of their type)
:param Cf: Connectivity branch-bus "from" with the branch states computed
:param Ct: Connectivity branch-bus "to" with the branch states computed
:param Yshunt_bus: array of shunts equivalent power per bus (p.u.)
:return: Yseries, Yshunt
"""
ys = 1.0 / (R + 1.0j * X + 1e-20) # series admittance
ysh = (G + 1j * B) / 2 # shunt admittance
# k is already filled with the appropriate value for each type of branch
tap = tap_module * np.exp(1.0j * tap_angle)
# compose the primitives
yff = ys / (tap * np.conj(tap) * vtap_f * vtap_f)
yft = - ys / (np.conj(tap) * vtap_f * vtap_t)
ytf = - ys / (tap * vtap_t * vtap_f)
ytt = ys / (vtap_t * vtap_t)
yff *= active
yft *= active
ytf *= active
ytt *= active
# compose the matrices
Yfs = sp.diags(yff) * Cf + sp.diags(yft) * Ct
Yts = sp.diags(ytf) * Cf + sp.diags(ytt) * Ct
Yseries = Cf.T * Yfs + Ct.T * Yts
Yshunt = Cf.T * ysh + Ct.T * ysh + Yshunt_bus
# GBc = G + 1.0j * B
# Gsh = GBc / 2.0
# Ysh = Yshunt_bus + Cf.T * Gsh + Ct.T * Gsh
return SeriesAdmittanceMatrices(Yseries, Yshunt)
[docs]
class FastDecoupledAdmittanceMatrices:
"""
Admittance matrices for Fast decoupled method
"""
__slots__ = (
"B1",
"B2",
)
def __init__(self, B1: sp.csc_matrix, B2: sp.csc_matrix):
self.B1 = B1
self.B2 = B2
[docs]
def compute_fast_decoupled_admittances(X: Vec,
B: Vec,
tap_module: Vec,
active: IntVec,
vtap_f: Vec,
vtap_t: Vec,
Cf: sp.csc_matrix,
Ct: sp.csc_matrix) -> FastDecoupledAdmittanceMatrices:
"""
Compute the admittance matrices for the fast decoupled method
:param X: array of branch reactance (p.u.)
:param B: array of branch susceptance (p.u.)
:param tap_module: array of tap modules (for all Branches, regardless of their type)
:param active: array of active branches (bool)
:param vtap_f: array of virtual taps at the "from" side
:param vtap_t: array of virtual taps at the "to" side
:param Cf: Connectivity branch-bus "from" with the branch states computed
:param Ct: Connectivity branch-bus "to" with the branch states computed
:return: B' and B''
"""
b1 = active / (X + 1e-20)
b1_tt = sp.diags(b1)
B1f = b1_tt * Cf - b1_tt * Ct
B1t = -b1_tt * Cf + b1_tt * Ct
B1 = Cf.T * B1f + Ct.T * B1t
b2 = b1 + B
b2_ff = -(b2 / (tap_module * np.conj(tap_module)) * vtap_f * vtap_f).real
b2_ft = -(b1 / (np.conj(tap_module) * vtap_f * vtap_t)).real
b2_tf = -(b1 / (tap_module * vtap_t * vtap_f)).real
b2_tt = - b2 / (vtap_t * vtap_t)
B2f = -sp.diags(b2_ff) * Cf + sp.diags(b2_ft) * Ct
B2t = sp.diags(b2_tf) * Cf - sp.diags(b2_tt) * Ct
B2 = Cf.T * B2f + Ct.T * B2t
return FastDecoupledAdmittanceMatrices(B1=B1.tocsc(), B2=B2.tocsc())
[docs]
class LinearAdmittanceMatrices:
"""
Admittance matrices for linear methods (DC power flow, PTDF, ...)
"""
__slots__ = (
"Bbus",
"Bf",
"Gbus",
"Gf",
)
def __init__(self, Bbus: sp.csc_matrix, Bf: sp.csc_matrix, Gbus: sp.csc_matrix, Gf: sp.csc_matrix):
self.Bbus = Bbus
self.Bf = Bf
self.Gbus = Gbus
self.Gf = Gf
[docs]
def get_Bred(self, pqpv: IntVec) -> sp.csc_matrix:
"""
Get Bred or Bpqpv for the PTDF and DC power flow
:param pqpv: list of non-slack indices
:return: B[pqpv, pqpv]
"""
return self.Bbus[np.ix_(pqpv, pqpv)].tocsc()
[docs]
def get_Bslack(self, pqpv: IntVec, vd: IntVec) -> sp.csc_matrix:
"""
Get Bslack for the PTDF and DC power flow
:param pqpv: list of non-slack indices
:param vd: list of slack ndices
:return: B[pqpv, vd]
"""
return self.Bbus[np.ix_(pqpv, vd)].tocsc()
[docs]
def compute_linear_admittances(nbr: int,
X: Vec,
R: Vec,
m: Vec,
active: IntVec,
Cf: sp.csc_matrix,
Ct: sp.csc_matrix,
ac: IntVec,
dc: IntVec) -> LinearAdmittanceMatrices:
"""
Compute the linear admittances for methods such as the "DC power flow" of the PTDF
:param nbr: Number of Branches
:param X: array of branch reactance (p.u.)
:param R: array of branch resistance (p.u.)
:param m: array of branch tap modules (p.u.)
:param active: array of branch active (bool)
:param Cf: Connectivity branch-bus "from" with the branch states computed
:param Ct: Connectivity branch-bus "to" with the branch states computed
:param ac: array of ac Branches indices
:param dc: array of dc Branches indices
:return: Bbus, Bf
"""
b = 1.0 / (X * active * m + 1e-20) # for ac Branches
b_tt = sp.diags(b) # This is Bd from the
Bf = b_tt * Cf - b_tt * Ct
Bt = -b_tt * Cf + b_tt * Ct
Bbus = Cf.T * Bf + Ct.T * Bt
g = 1.0 / (R * active + 1e-20) # for dc Branches
g_tt = sp.diags(g) # This is Bd from the
Gf = g_tt * Cf - g_tt * Ct
Gt = -g_tt * Cf + g_tt * Ct
Gbus = Cf.T * Gf + Ct.T * Gt
"""
According to the KULeuven document "DC power flow in unit commitment models"
The DC power flow is:
Pbus = (A^T x Bd x A) x bus_angles + (Bd x A)^T x branch_angles
Identifying the already computed matrices, it becomes:
Pbus = Bbus x bus_angles + Btau x branch_angles
If we solve for bus angles:
bus_angles = Bbus^-1 x (Pbus - Btau x branch_angles)
"""
return LinearAdmittanceMatrices(Bbus=Bbus, Bf=Bf, Gbus=Gbus, Gf=Gf)