# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import numpy as np
import scipy.linalg as spla
from typing import List, Tuple, Any
from VeraGridEngine.Utils.Symbolic.block import Block, Var
from VeraGridEngine.enumerations import DeviceType, EmtLineTypes, VarPowerFlowReferenceType
from VeraGridEngine.Devices.Dynamic.var_factory import VarFactory
from VeraGridEngine.Devices.Dynamic.emt_template import EmtModelTemplate
def _build_symmetric_abc_matrix_from_sequence_values(positive_sequence_value: complex,
zero_sequence_value: complex) -> np.ndarray:
"""
Reconstruct one symmetric ABC matrix from one pair of sequence values.
:param positive_sequence_value: Positive-sequence scalar value.
:param zero_sequence_value: Zero-sequence scalar value.
:return: Symmetric 3x3 phase-domain matrix.
"""
diagonal_value: complex = (2.0 * positive_sequence_value + zero_sequence_value) / 3.0
off_diagonal_value: complex = (zero_sequence_value - positive_sequence_value) / 3.0
matrix_abc: np.ndarray = np.full((3, 3), off_diagonal_value, dtype=np.complex128)
np.fill_diagonal(matrix_abc, diagonal_value)
return matrix_abc
def _get_sequence_zero_value(primary_value: float, zero_value: float) -> float:
"""
Resolve one usable zero-sequence scalar from one line object.
The balanced branch data model may omit explicit zero-sequence values. In that
case the Bergeron fallback must still build one symmetric phase-domain model.
The least surprising approximation is to reuse the positive-sequence value.
:param primary_value: Positive-sequence scalar value.
:param zero_value: Candidate zero-sequence scalar value.
:return: Zero-sequence scalar selected for reconstruction.
"""
if abs(float(zero_value)) > 1.0e-20:
return float(zero_value)
else:
return float(primary_value)
def _build_line_parameter_matrices_from_template_or_balanced_data(line: Any,
sbase: float,
fbase: float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Build full phase-domain ``R``, ``L`` and ``C`` matrices for one Bergeron line.
The preferred path uses the detailed EMT template matrices. When no template is
attached, the function reconstructs one symmetric ABC equivalent from the
balanced branch parameters already stored on the line object.
:param line: Line API object.
:param sbase: System base power in MVA.
:param fbase: Nominal frequency in Hz.
:return: Tuple ``(R_full, L_full, C_full)`` in per-unit total-line form.
"""
w: float = 2.0 * np.pi * float(fbase)
if line.template is not None:
vbase_volt: float = float(line.bus_from.Vnom) * 1e3
sbase_va: float = float(sbase) * 1e6
zbase_ohm: float = (vbase_volt * vbase_volt) / sbase_va
ybase_siemens: float = 1.0 / zbase_ohm
# Bergeron travelling-wave dynamics are highly sensitive to the exact
# per-length ``Z`` and ``Y`` matrices. The original template matrices are
# already stored in the precise physical form expected by this model, and
# the delay calculation in ``line.get_tau()`` still uses that same source.
# Keeping Bergeron on the template path therefore preserves one internally
# consistent representation for ``R/L/C`` and ``tau``.
z_phys_m: np.ndarray = np.asarray(line.template.z_nabc, dtype=np.complex128) / 1e3
y_phys_m: np.ndarray = np.asarray(line.template.y_nabc, dtype=np.complex128) / 1e3
z_pu_m: np.ndarray = z_phys_m / zbase_ohm
y_pu_m: np.ndarray = y_phys_m / ybase_siemens
r_full: np.ndarray = np.real(z_pu_m) * float(line.length)
l_full: np.ndarray = (np.imag(z_pu_m) / w) * float(line.length)
c_full: np.ndarray = (np.imag(y_pu_m) / w) * float(line.length)
return r_full, l_full, c_full
else:
pass
# if line.ys is not None and line.ysh is not None:
# vbase_volt: float = float(line.bus_from.Vnom) * 1e3
# sbase_va: float = float(sbase) * 1e6
# zbase_ohm: float = (vbase_volt * vbase_volt) / sbase_va
# ybase_siemens: float = 1.0 / zbase_ohm
#
# # The template stores Carson matrices per km. Bergeron uses per-meter and
# # then converts them to per-unit before forming the total line matrices.
# z_phys_m: np.ndarray = np.asarray(line.ys.values, dtype=np.complex128) / 1e3
# y_phys_m: np.ndarray = np.asarray(line.ysh.values, dtype=np.complex128) / 1e3
# z_pu_m: np.ndarray = z_phys_m / zbase_ohm
# y_pu_m: np.ndarray = y_phys_m / ybase_siemens
#
# r_full: np.ndarray = np.real(z_pu_m) * float(line.length)
# l_full: np.ndarray = (np.imag(z_pu_m) / w) * float(line.length)
# c_full: np.ndarray = (np.imag(y_pu_m) / w) * float(line.length)
# return r_full, l_full, c_full
# Without a detailed template we rebuild one symmetric 3-phase equivalent from
# the sequence branch data. This mirrors the balanced-to-phase reconstruction
# already used elsewhere for EMT line fitting.
r1_value: float = float(line.R)
x1_value: float = float(line.X)
b1_value: float = float(line.B)
r0_value: float = _get_sequence_zero_value(primary_value=r1_value, zero_value=float(line.R0))
x0_value: float = _get_sequence_zero_value(primary_value=x1_value, zero_value=float(line.X0))
b0_value: float = _get_sequence_zero_value(primary_value=b1_value, zero_value=float(line.B0))
z1_value: complex = complex(r1_value, x1_value)
z0_value: complex = complex(r0_value, x0_value)
y1_value: complex = complex(0.0, b1_value)
y0_value: complex = complex(0.0, b0_value)
z_full: np.ndarray = _build_symmetric_abc_matrix_from_sequence_values(
positive_sequence_value=z1_value,
zero_sequence_value=z0_value,
)
y_full: np.ndarray = _build_symmetric_abc_matrix_from_sequence_values(
positive_sequence_value=y1_value,
zero_sequence_value=y0_value,
)
r_full = np.real(z_full)
l_full = np.imag(z_full) / w
c_full = np.imag(y_full) / w
return r_full, l_full, c_full
[docs]
def get_bergeron_line_emt_template(
vf: VarFactory,
phN: bool = False,
phA: bool = True,
phB: bool = True,
phC: bool = True,
name: str | None = "Bergeron") -> EmtModelTemplate:
"""
Create the symbolic EMT template for a Bergeron line.
Only active phases in NABC are represented.
The history terms are modelled as runtime-updated event parameters.
:param vf: EMT variable factory.
:param phN: Bool. True if the line has neutral, else False.
:param phA: Bool. True if the line has phase A, else False.
:param phB: Bool. True if the line has phase B, else False.
:param phC: Bool. True if the line has phase C, else False.
:param name: Symbolic model name.
:return: EMT bergeron-line model template.
"""
templ = EmtModelTemplate()
templ.tpe = DeviceType.LineDevice
templ.name = name
templ.block.name = EmtLineTypes.Bergeron.value
ph_labels = ["N", "A", "B", "C"]
ph_mask = np.array([
phN,
phA,
phB,
phC,
], dtype=bool)
active_ph = [lab for lab, on in zip(ph_labels, ph_mask) if on]
vf_keys = dict({
"N": VarPowerFlowReferenceType.vf_N,
"A": VarPowerFlowReferenceType.vf_A,
"B": VarPowerFlowReferenceType.vf_B,
"C": VarPowerFlowReferenceType.vf_C,
})
vt_keys = dict({
"N": VarPowerFlowReferenceType.vt_N,
"A": VarPowerFlowReferenceType.vt_A,
"B": VarPowerFlowReferenceType.vt_B,
"C": VarPowerFlowReferenceType.vt_C,
})
vf_vars = [vf.add_var(name=f"vf_{ph_label}_{name}", reference=vf_keys[ph_label]) for ph_label in active_ph]
vt_vars = [vf.add_var(name=f"vt_{ph_label}_{name}", reference=vt_keys[ph_label]) for ph_label in active_ph]
templ.block.in_vars = vf_vars + vt_vars
ih_f = [vf.add_var(f"Ih_f_{name}_{ph}") for ph in active_ph]
ih_t = [vf.add_var(f"Ih_t_{name}_{ph}") for ph in active_ph]
# I history
templ.block.event_dict = {p: vf.add_const(0.0) for p in (ih_f + ih_t)}
return templ
[docs]
class BergeronHistoryRuntime:
"""
Runtime companion for a Bergeron line in reduced active-phase space.
"""
__slots__ = (
"line",
"block",
"h",
"hist_if",
"hist_it",
"hist_Ih_f",
"hist_Ih_t",
"hist_Ih_f_next",
"hist_Ih_t_next",
"ph_labels",
"ph_mask",
"phase_idx",
"active_ph",
"m",
"Ih_f",
"Ih_t",
"Gc_red",
"H_red",
"n_delay",
"buf_vf",
"buf_vt",
"buf_if",
"buf_it",
"v_f_vars",
"v_t_vars",
"idx_vf",
"idx_vt",
"idx_p_hf",
"idx_p_ht",
)
def __init__(self, line: Any, line_block: Block, h: float, sbase: float, fbase: float) -> None:
self.line = line
self.block = line_block
self.h = float(h)
self.hist_if: List[np.ndarray] = []
self.hist_it: List[np.ndarray] = []
self.hist_Ih_f: List[np.ndarray] = []
self.hist_Ih_t: List[np.ndarray] = []
self.hist_Ih_f_next: List[np.ndarray] = []
self.hist_Ih_t_next: List[np.ndarray] = []
self.ph_labels = ["N", "A", "B", "C"]
self.ph_mask = np.array([
bool(line.ys.phN),
bool(line.ys.phA),
bool(line.ys.phB),
bool(line.ys.phC),
], dtype=bool)
idx_global = np.where(self.ph_mask)[0]
self.phase_idx: List[int] = [int(i) for i in idx_global]
self.active_ph: List[str] = [self.ph_labels[i] for i in self.phase_idx]
self.m: int = len(self.active_ph)
if self.m == 0:
raise ValueError(f"Bergeron line '{line.name}' has no enabled phases in line.ys")
if self.block.event_dict is None:
raise ValueError(f"Bergeron line '{line.name}': block.event_dict is None")
self.Ih_f = self._extract_hist_vars(prefix=f"Ih_f_{line.name}_")
self.Ih_t = self._extract_hist_vars(prefix=f"Ih_t_{line.name}_")
w = 2.0 * np.pi * fbase
R_full, L_full, C_full = _build_line_parameter_matrices_from_template_or_balanced_data(
line=line,
sbase=sbase,
fbase=fbase,
)
n_mat = R_full.shape[0]
if R_full.shape[0] != R_full.shape[1]:
raise ValueError(
f"Bergeron line '{line.name}' has a non-square Z matrix with shape {R_full.shape}."
)
if L_full.shape != R_full.shape or C_full.shape != R_full.shape:
raise ValueError(
f"Bergeron line '{line.name}' has inconsistent matrix shapes: "
f"R={R_full.shape}, L={L_full.shape}, C={C_full.shape}."
)
if n_mat == 3:
# Matrices are stored in compact ABC order
if 0 in idx_global:
raise ValueError(
f"Bergeron line '{line.name}' has N phase enabled but template matrices are 3x3 (ABC)."
)
idx_mat = idx_global - 1 # A->0, B->1, C->2
elif n_mat == 4:
# Matrices are stored in full NABC order
idx_mat = idx_global
else:
raise ValueError(
f"Unsupported template matrix size {R_full.shape} for line '{line.name}' "
f"(expected 3x3 or 4x4)."
)
idx_mat = np.array(idx_mat, dtype=int)
R = R_full[np.ix_(idx_mat, idx_mat)]
L = L_full[np.ix_(idx_mat, idx_mat)]
C = C_full[np.ix_(idx_mat, idx_mat)]
try:
Zc = spla.sqrtm(L @ np.linalg.inv(C))
except Exception:
Zc = np.eye(self.m) * np.sqrt(L[0, 0] / (C[0, 0] + 1e-20))
Z_eq = np.real(Zc) + R / 4.0
self.Gc_red = np.linalg.inv(Z_eq)
self.H_red = (np.real(Zc) - R / 4.0) @ self.Gc_red
tau = line.get_tau(w=w)
self.n_delay = max(1, int(round(tau / self.h)))
self.buf_vf = np.zeros((self.n_delay + 1, self.m), dtype=float)
self.buf_vt = np.zeros((self.n_delay + 1, self.m), dtype=float)
self.buf_if = np.zeros((self.n_delay + 1, self.m), dtype=float)
self.buf_it = np.zeros((self.n_delay + 1, self.m), dtype=float)
self.v_f_vars = None
self.v_t_vars = None
self.idx_vf = None
self.idx_vt = None
self.idx_p_hf = None
self.idx_p_ht = None
def _extract_hist_vars(self, prefix: str) -> List[Var]:
vars_by_name = {getattr(v, "name", ""): v for v in self.block.event_dict.keys()}
out: List[Var] = []
for ph in self.active_ph:
key = f"{prefix}{ph}"
var = vars_by_name.get(key, None)
if var is None:
matching_names: List[str] = list()
for candidate_name in vars_by_name.keys():
if candidate_name.startswith(prefix.split("_", 2)[0] + "_" + prefix.split("_", 2)[1] + "_") and candidate_name.endswith(f"_{ph}"):
matching_names.append(candidate_name)
else:
pass
if len(matching_names) == 1:
var = vars_by_name[matching_names[0]]
else:
raise ValueError(f"Missing Bergeron history var '{key}' for line '{self.line.name}'.")
else:
pass
out.append(var)
return out
[docs]
def bind_terminals(self, v_f_vars: List[Any], v_t_vars: List[Any]) -> None:
"""
Bind the bus terminal voltage variables for the active phases only.
Every active line phase must be present in both terminal bus models.
Missing voltages are treated as a topology/model consistency error.
"""
self.v_f_vars = list()
self.v_t_vars = list()
for idx_full, ph in zip(self.phase_idx, self.active_ph):
vf_i = v_f_vars[idx_full]
vt_i = v_t_vars[idx_full]
if vf_i is None:
raise ValueError(
f"Bergeron line '{self.line.name}' has active phase '{ph}' "
f"but from-bus '{self.line.bus_from.name}' does not provide the corresponding EMT voltage variable."
)
if vt_i is None:
raise ValueError(
f"Bergeron line '{self.line.name}' has active phase '{ph}' "
f"but to-bus '{self.line.bus_to.name}' does not provide the corresponding EMT voltage variable."
)
self.v_f_vars.append(vf_i)
self.v_t_vars.append(vt_i)
[docs]
def get_nodal_injections(self) -> Tuple[List[Any], List[Any]]:
if self.v_f_vars is None or self.v_t_vars is None:
raise RuntimeError("bind_terminals(...) must be called before get_nodal_injections().")
i_f_red: List[Any] = []
i_t_red: List[Any] = []
for i in range(self.m):
expr_f = sum(self.Gc_red[i, j] * self.v_f_vars[j] for j in range(self.m)) + self.Ih_f[i]
expr_t = sum(self.Gc_red[i, j] * self.v_t_vars[j] for j in range(self.m)) + self.Ih_t[i]
i_f_red.append(expr_f)
i_t_red.append(expr_t)
i_f_full: List[Any] = [None, None, None, None]
i_t_full: List[Any] = [None, None, None, None]
for k_red, k_full in enumerate(self.phase_idx):
i_f_full[k_full] = i_f_red[k_red]
i_t_full[k_full] = i_t_red[k_red]
return i_f_full, i_t_full
[docs]
def setup_indices(self, uid2idx_vars: dict, uid2idx_event_params: dict, params_offset: int = 0) -> None:
self.idx_vf = [uid2idx_vars[v.uid] for v in self.v_f_vars]
self.idx_vt = [uid2idx_vars[v.uid] for v in self.v_t_vars]
self.idx_p_hf = [uid2idx_event_params[p.uid] + params_offset for p in self.Ih_f]
self.idx_p_ht = [uid2idx_event_params[p.uid] + params_offset for p in self.Ih_t]
[docs]
def update_history(self, step_counter: int, x_prev: np.ndarray, full_params: np.ndarray) -> None:
k_curr = step_counter % (self.n_delay + 1)
k_tau = (step_counter - self.n_delay) % (self.n_delay + 1)
v_f_now = np.array([x_prev[i] if i >= 0 else 0.0 for i in self.idx_vf], dtype=float)
v_t_now = np.array([x_prev[i] if i >= 0 else 0.0 for i in self.idx_vt], dtype=float)
self.buf_vf[k_curr, :] = v_f_now
self.buf_vt[k_curr, :] = v_t_now
Ih_f_now = np.array([full_params[i] for i in self.idx_p_hf], dtype=float)
Ih_t_now = np.array([full_params[i] for i in self.idx_p_ht], dtype=float)
self.buf_if[k_curr, :] = self.Gc_red @ v_f_now + Ih_f_now
self.buf_it[k_curr, :] = self.Gc_red @ v_t_now + Ih_t_now
v_f_tau = self.buf_vf[k_tau, :]
v_t_tau = self.buf_vt[k_tau, :]
i_f_tau = self.buf_if[k_tau, :]
i_t_tau = self.buf_it[k_tau, :]
X_f = -self.Gc_red @ v_t_tau - i_t_tau
Y_f = -self.Gc_red @ v_f_tau - i_f_tau
X_t = -self.Gc_red @ v_f_tau - i_f_tau
Y_t = -self.Gc_red @ v_t_tau - i_t_tau
I = np.eye(self.m)
I_hist_f = 0.5 * ((I + self.H_red) @ X_f + (I - self.H_red) @ Y_f)
I_hist_t = 0.5 * ((I + self.H_red) @ X_t + (I - self.H_red) @ Y_t)
# Current values actually used in this step
i_f_now = self.Gc_red @ v_f_now + Ih_f_now
i_t_now = self.Gc_red @ v_t_now + Ih_t_now
self.hist_if.append(i_f_now.copy())
self.hist_it.append(i_t_now.copy())
self.hist_Ih_f.append(Ih_f_now.copy())
self.hist_Ih_t.append(Ih_t_now.copy())
# New history values prepared for the next step
self.hist_Ih_f_next.append(I_hist_f.copy())
self.hist_Ih_t_next.append(I_hist_t.copy())
for i in range(self.m):
full_params[self.idx_p_hf[i]] = I_hist_f[i]
full_params[self.idx_p_ht[i]] = I_hist_t[i]
[docs]
def initialize_buffers_from_initial_point(
self,
v_f0_red: np.ndarray,
v_t0_red: np.ndarray,
i_f0_red: np.ndarray,
i_t0_red: np.ndarray
) -> None:
"""
Fill all delay buffers with the initial steady-state point.
"""
for k in range(self.n_delay + 1):
self.buf_vf[k, :] = v_f0_red
self.buf_vt[k, :] = v_t0_red
self.buf_if[k, :] = i_f0_red
self.buf_it[k, :] = i_t0_red