Source code for VeraGridEngine.Templates.Emt.pi_line_emt_template

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from typing import Dict
import numpy as np

from VeraGridEngine.enumerations import DeviceType, VarPowerFlowReferenceType, ParamPowerFlowReferenceType, EmtLineTypes
from VeraGridEngine.Devices.Dynamic.var_factory import VarFactory
from VeraGridEngine.Devices.Dynamic.emt_template import EmtModelTemplate
from VeraGridEngine.Utils.Symbolic.block import Var, Expr

[docs] def get_pi_line_emt_template(vf: VarFactory, phN: bool = False, phA: bool = True, phB: bool = True, phC: bool = True, name: str = "Pi") -> EmtModelTemplate: """ Build the EMT pi-line template with explicit API-mapped parameters. The line connection defines which phases are physically present, while the EMT initializer later injects the electrical ``R``, ``Linv`` and ``C`` values through the block ``api_obj_mapping`` using the full 4x4 NABC enum contract. :param vf: EMT variable factory. :param phN: Bool. True if the line has neutral, else False. :param phA: Bool. True if the line has phase A, else False. :param phB: Bool. True if the line has phase B, else False. :param phC: Bool. True if the line has phase C, else False. :param name: Symbolic model name. :return: EMT pi-line model template. :raises ValueError: If the line has no active phases. """ templ = EmtModelTemplate() templ.tpe = DeviceType.LineDevice templ.name = name templ.block.name = EmtLineTypes.PI.value c0 = vf.add_const(0.0) # ----------------------------- # Phase mask from the line itself [N,A,B,C] # ----------------------------- # The template phase layout must be driven exclusively by the explicit # phase flags of the current API. The optional ``line`` argument remains # only for backward compatibility with older callers. ph_mask = np.array([ phN, phA, phB, phC, ], dtype=bool) ph_labels = list(["N", "A", "B", "C"]) idx_global = np.where(ph_mask)[0] # indices in global NABC convention active_ph = list(ph_labels[i] for i in idx_global) m = len(active_ph) if m == 0: raise ValueError(f"PI line '{name}' has no enabled phases in line.ys") # ----------------------------- # Define the fixed NABC API mapping contract. # The initializer writes parameter values into these exact enum slots, # which is why the template must read them back with the same layout. # ----------------------------- r_enums = list([ list([ ParamPowerFlowReferenceType.Rnn, ParamPowerFlowReferenceType.Rna, ParamPowerFlowReferenceType.Rnb, ParamPowerFlowReferenceType.Rnc, ]), list([ ParamPowerFlowReferenceType.Ran, ParamPowerFlowReferenceType.Raa, ParamPowerFlowReferenceType.Rab, ParamPowerFlowReferenceType.Rac, ]), list([ ParamPowerFlowReferenceType.Rbn, ParamPowerFlowReferenceType.Rba, ParamPowerFlowReferenceType.Rbb, ParamPowerFlowReferenceType.Rbc, ]), list([ ParamPowerFlowReferenceType.Rcn, ParamPowerFlowReferenceType.Rca, ParamPowerFlowReferenceType.Rcb, ParamPowerFlowReferenceType.Rcc, ]), ]) linv_enums = list([ list([ ParamPowerFlowReferenceType.Linv_nn, ParamPowerFlowReferenceType.Linv_na, ParamPowerFlowReferenceType.Linv_nb, ParamPowerFlowReferenceType.Linv_nc, ]), list([ ParamPowerFlowReferenceType.Linv_an, ParamPowerFlowReferenceType.Linv_aa, ParamPowerFlowReferenceType.Linv_ab, ParamPowerFlowReferenceType.Linv_ac, ]), list([ ParamPowerFlowReferenceType.Linv_bn, ParamPowerFlowReferenceType.Linv_ba, ParamPowerFlowReferenceType.Linv_bb, ParamPowerFlowReferenceType.Linv_bc, ]), list([ ParamPowerFlowReferenceType.Linv_cn, ParamPowerFlowReferenceType.Linv_ca, ParamPowerFlowReferenceType.Linv_cb, ParamPowerFlowReferenceType.Linv_cc, ]), ]) c_enums = list([ list([ ParamPowerFlowReferenceType.Cnn, ParamPowerFlowReferenceType.Cna, ParamPowerFlowReferenceType.Cnb, ParamPowerFlowReferenceType.Cnc, ]), list([ ParamPowerFlowReferenceType.Can, ParamPowerFlowReferenceType.Caa, ParamPowerFlowReferenceType.Cab, ParamPowerFlowReferenceType.Cac, ]), list([ ParamPowerFlowReferenceType.Cbn, ParamPowerFlowReferenceType.Cba, ParamPowerFlowReferenceType.Cbb, ParamPowerFlowReferenceType.Cbc, ]), list([ ParamPowerFlowReferenceType.Ccn, ParamPowerFlowReferenceType.Cca, ParamPowerFlowReferenceType.Ccb, ParamPowerFlowReferenceType.Ccc, ]), ]) # ----------------------------- # Create the symbolic parameter variables and publish them in the API map. # The EMT problem writes numerical values into these variables later, so the # template equations must depend on the mapped variables instead of line data. # ----------------------------- phases = list(["n", "a", "b", "c"]) for i in range(4): for j in range(4): var_r = vf.add_var(f"R{phases[i]}{phases[j]}_{name}") templ.block.api_obj_mapping[r_enums[i][j]] = var_r templ.block.parameters[var_r] = vf.add_const(0.0) var_l = vf.add_var(f"Linv_{phases[i]}{phases[j]}_{name}") templ.block.api_obj_mapping[linv_enums[i][j]] = var_l templ.block.parameters[var_l] = vf.add_const(0.0) var_c = vf.add_var(f"C{phases[i]}{phases[j]}_{name}") templ.block.api_obj_mapping[c_enums[i][j]] = var_c templ.block.parameters[var_c] = vf.add_const(0.0) # ----------------------------- # Reduce the full mapped matrices using the physical line phase mask. # This cleanly separates topology from parameters: line.ys chooses the # active equations, while api_obj_mapping supplies the electrical values. # ----------------------------- api_mapping: Dict[ParamPowerFlowReferenceType, Var] = templ.block.api_obj_mapping r_full: list[list[Var]] = list() linv_full: list[list[Var]] = list() c_full: list[list[Var]] = list() for i in range(4): r_row: list[Var] = list() linv_row: list[Var] = list() c_row: list[Var] = list() for j in range(4): r_row.append(api_mapping[r_enums[i][j]]) linv_row.append(api_mapping[linv_enums[i][j]]) c_row.append(api_mapping[c_enums[i][j]]) r_full.append(r_row) linv_full.append(linv_row) c_full.append(c_row) R_red = list(list(r_full[i][j] for j in idx_global) for i in idx_global) Linv_red = list(list(linv_full[i][j] for j in idx_global) for i in idx_global) C_red = list(list(c_full[i][j] for j in idx_global) for i in idx_global) # ----------------------------- # Create ONLY active terminal voltage input vars # ----------------------------- vf_keys = dict({ "N": VarPowerFlowReferenceType.vf_N, "A": VarPowerFlowReferenceType.vf_A, "B": VarPowerFlowReferenceType.vf_B, "C": VarPowerFlowReferenceType.vf_C, }) vt_keys = dict({ "N": VarPowerFlowReferenceType.vt_N, "A": VarPowerFlowReferenceType.vt_A, "B": VarPowerFlowReferenceType.vt_B, "C": VarPowerFlowReferenceType.vt_C, }) if_keys = dict({ "N": VarPowerFlowReferenceType.if_N, "A": VarPowerFlowReferenceType.if_A, "B": VarPowerFlowReferenceType.if_B, "C": VarPowerFlowReferenceType.if_C, }) it_keys = dict({ "N": VarPowerFlowReferenceType.it_N, "A": VarPowerFlowReferenceType.it_A, "B": VarPowerFlowReferenceType.it_B, "C": VarPowerFlowReferenceType.it_C, }) vf_vars = [vf.add_var(name=f"vf_{ph_label}_{name}", reference=vf_keys[ph_label]) for ph_label in active_ph] vt_vars = [vf.add_var(name=f"vt_{ph_label}_{name}", reference=vt_keys[ph_label]) for ph_label in active_ph] # d_vf_vars = [vf.add_diff_var(name=f"d_vf_{ph_label}_{name}", base_var=v_base) for ph_label, v_base in zip(active_ph, vf_vars)] # d_vt_vars = [vf.add_diff_var(name=f"d_vt_{ph_label}_{name}", base_var=v_base) for ph_label, v_base in zip(active_ph, vt_vars)] # ----------------------------- # Create model vars # ----------------------------- i_ser = [vf.add_var(name=f"i_ser_{name}_{ph_label}") for ph_label in active_ph] q_f = [vf.add_var(name=f"q_f_{name}_{ph_label}") for ph_label in active_ph] q_t = [vf.add_var(name=f"q_t_{name}_{ph_label}") for ph_label in active_ph] di_ser = [vf.add_diff_var(name=f"di_ser_{name}_{ph_label}", base_var=i_ser[k]) for k, ph_label in enumerate(active_ph)] dq_f = [vf.add_diff_var(name=f"dq_f_{name}_{ph_label}", base_var=q_f[k]) for k, ph_label in enumerate(active_ph)] dq_t = [vf.add_diff_var(name=f"dq_t_{name}_{ph_label}", base_var=q_t[k]) for k, ph_label in enumerate(active_ph)] i_cap_f = [vf.add_var(name=f"i_cap_f_{name}_{ph_label}") for ph_label in active_ph] i_cap_t = [vf.add_var(name=f"i_cap_t_{name}_{ph_label}") for ph_label in active_ph] if_act = [vf.add_var(name=f"if_{name}_{ph_label}", reference=if_keys[ph_label]) for ph_label in active_ph] it_act = [vf.add_var(name=f"it_{name}_{ph_label}", reference=it_keys[ph_label]) for ph_label in active_ph] # ----------------------------- # Build block # ----------------------------- templ.block.in_vars = vf_vars + vt_vars templ.block.state_vars = i_ser + q_f + q_t templ.block.diff_vars = di_ser + dq_f + dq_t templ.block.algebraic_vars = i_cap_f + i_cap_t + if_act + it_act # ----------------------------- # State equations # ----------------------------- state_eqs = [] for a in range(m): expr_rhs = c0 for b in range(m): L_inv_ab = Linv_red[a][b] term_b = (vf_vars[b] - vt_vars[b]) for k in range(m): R_bk = R_red[b][k] term_b = term_b - R_bk * i_ser[k] expr_rhs = expr_rhs + L_inv_ab * term_b state_eqs.append(expr_rhs) for a in range(m): state_eqs.append(i_cap_f[a]) for a in range(m): state_eqs.append(i_cap_t[a]) templ.block.state_eqs = state_eqs # ----------------------------- # Algebraic equations # ----------------------------- alg_eqs = [] for a in range(m): rhs_q = c0 for b in range(m): C_ab = C_red[a][b] rhs_q = rhs_q + C_ab * vf_vars[b] alg_eqs.append(q_f[a] - rhs_q) for a in range(m): rhs_q = c0 for b in range(m): C_ab = C_red[a][b] rhs_q = rhs_q + C_ab * vt_vars[b] alg_eqs.append(q_t[a] - rhs_q) G_damp = 1e-5 for a in range(m): alg_eqs.append(if_act[a] - (i_ser[a] + i_cap_f[a] + G_damp * vf_vars[a])) alg_eqs.append(it_act[a] - (-i_ser[a] + i_cap_t[a] + G_damp * vt_vars[a])) templ.block.algebraic_eqs = alg_eqs templ.block.out_vars = if_act + it_act # ----------------------------- # External mapping # ----------------------------- if_keys = dict({"N": VarPowerFlowReferenceType.if_N, "A": VarPowerFlowReferenceType.if_A, "B": VarPowerFlowReferenceType.if_B, "C": VarPowerFlowReferenceType.if_C}) it_keys = dict({"N": VarPowerFlowReferenceType.it_N, "A": VarPowerFlowReferenceType.it_A, "B": VarPowerFlowReferenceType.it_B, "C": VarPowerFlowReferenceType.it_C}) mapping: Dict[VarPowerFlowReferenceType, Var | None] = dict({ if_keys["N"]: None, if_keys["A"]: None, if_keys["B"]: None, if_keys["C"]: None, it_keys["N"]: None, it_keys["A"]: None, it_keys["B"]: None, it_keys["C"]: None, vf_keys["N"]: None, vf_keys["A"]: None, vf_keys["B"]: None, vf_keys["C"]: None, vt_keys["N"]: None, vt_keys["A"]: None, vt_keys["B"]: None, vt_keys["C"]: None, }) for k, phase_label in enumerate(active_ph): mapping[vf_keys[phase_label]] = vf_vars[k] mapping[vt_keys[phase_label]] = vt_vars[k] mapping[if_keys[phase_label]] = if_act[k] mapping[it_keys[phase_label]] = it_act[k] templ.block.external_mapping = mapping # ----------------------------- # Init equations # ----------------------------- init_eqs: Dict[Var, Expr] = {} for a in range(m): rhs_f = c0 rhs_t = c0 for b in range(m): C_ab = C_red[a][b] rhs_f += C_ab * vf_vars[b] rhs_t += C_ab * vt_vars[b] init_eqs[q_f[a]] = rhs_f init_eqs[q_t[a]] = rhs_t for a in range(m): init_eqs[i_ser[a]] = if_act[a] - i_cap_f[a] templ.block.init_eqs = init_eqs # ----------------------------- # Derivative initialization # ----------------------------- diff_init_eqs: Dict[Var, Expr] = {} for a in range(m): di_expression = c0 for b in range(m): term_b = (vf_vars[b] - vt_vars[b]) for k in range(m): R_bk = R_red[b][k] term_b = term_b - R_bk * i_ser[k] coeff = Linv_red[a][b] di_expression = di_expression + coeff * term_b diff_init_eqs[di_ser[a]] = di_expression for a in range(m): diff_init_eqs[dq_f[a]] = i_cap_f[a] diff_init_eqs[dq_t[a]] = i_cap_t[a] for a in range(m): # ----------------------------- # Initialize capacitor currents without terminal voltage derivatives. # The helper d_vf/d_vt symbols are not bound by the EMT assembler during # explicit initialization, so referencing them here leaves the explicit # initializer without values for those UIDs. # # Using zero capacitor current preserves the previous startup state used # by the fallback initialization path and keeps the template self-contained # for explicit initialization. # ----------------------------- init_eqs[i_cap_f[a]] = c0 init_eqs[i_cap_t[a]] = c0 templ.block.diff_init_eqs = diff_init_eqs return templ