Source code for VeraGridEngine.Simulations.EMT.problems.emt_problem_dae

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

"""
Module providing the EmtProblemDae class, which acts as the electrical
parser for the generic BaseProblem layer. It translates electrical components
(buses, branches, injections) into the unified DAE mathematical structure.
"""

from __future__ import annotations

import time
import numpy as np
import pandas as pd
import scipy.sparse as sp
import scipy.sparse.linalg as spla
import numpy.linalg as la
from typing import Dict, List, Any, Set, Optional, Tuple
from itertools import chain
import copy

from VeraGridEngine.Devices import MultiCircuit
from VeraGridEngine.Devices.Substation.bus import Bus
from VeraGridEngine.Simulations.driver_template import DummySignal
from VeraGridEngine.Devices.Events.emt_events_group import EmtEventsGroup
from VeraGridEngine.Devices.types import ALL_DEV_TYPES
from VeraGridEngine.Utils.Symbolic.symbolic import Var, Expr, piecewise, Const, hard_sat, heaviside
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.enumerations import (
    VarPowerFlowReferenceType,
    ParamPowerFlowReferenceType,
    DeviceType,
    DynamicEventTransitionType,
    EmtInitializationMethod,
    EmtInitializationStatus,
    EmtLineTypes,
    WindingType,
)
from VeraGridEngine.basic_structures import Logger, CxVec
from VeraGridEngine.Simulations.EMT.emt_options import EmtOptions
from VeraGridEngine.Templates.Emt.bergeron_line_emt_template import BergeronHistoryRuntime
from VeraGridEngine.Simulations.EMT.JMARTI_Sim.jmarti_runtime import JMartiHistoryRuntime
from VeraGridEngine.Simulations.PowerFlow.power_flow_results_3ph import PowerFlowResults3Ph
from VeraGridEngine.Simulations.PowerFlow.power_flow_results import PowerFlowResults
from VeraGridEngine.Simulations.EMT.initialization_emt import EmtInitializationReport, run_emt_native_initialization, _compute_missing_dx0
from VeraGridEngine.Utils.Symbolic.explicit_initialization_symbolic import init_explicit_common, build_symbolic_vector_single_equation_compiler
from VeraGridEngine.Utils.Symbolic.compiled_functions import get_compiled_functions_cache_stats
from VeraGridEngine.Utils.Symbolic.bus_emt_template import get_bus_emt_algebraic_vars
from VeraGridEngine.Utils.procedural_logic import build_boundary_updater_from_block
from VeraGridEngine.IO.fmu.importer import (
    finalize_emt_fmu_cs_devices,
    finalize_emt_fmu_me_devices,
    queue_emt_fmu_cs_device,
    queue_emt_fmu_me_device,
)

from VeraGridEngine.Simulations.EMT.problems.emt_problem_template import (
    EmtBoundaryUpdateProtocol,
    EmtProblemTemplate,
)
# from VeraGridEngine.Utils.Symbolic.static_parameter_mapping import (
#     abc_to_nabc,
#     assign_static_api_object_mapping_for_device,
#     get_load_power_phase_values_pu_abc,
#     get_shunt_phase_values_pu_abc,
# )
from VeraGridEngine.Utils.Symbolic.static_parameter_mapping_rms import (
    abc_to_nabc,
    assign_static_api_object_mapping_for_device,
    get_load_power_phase_values_pu_abc,
    get_shunt_phase_values_pu_abc,
)


def _tic() -> float:
    """
    Returns the current performance counter time.
    """
    return time.perf_counter()


def _toc(t0: float) -> float:
    """
    Returns the elapsed time since t0.
    """
    return time.perf_counter() - t0


def _freeze_runtime_expr_at_time(expr: Any, time_var: Var, sample_time: float) -> Any:
    """
    Freeze one runtime expression at an explicit time value.

    :param expr: Runtime expression to freeze.
    :param time_var: Global symbolic time variable.
    :param sample_time: Time used to evaluate the expression boundary.
    :return: Time-frozen symbolic expression.
    """
    if isinstance(expr, (Expr, Var, Const)):
        return expr.subs(dict({time_var: Const(float(sample_time))})).simplify()
    else:
        return expr


def _build_ramp_runtime_expr(
    time_var: Var,
    start_time: float,
    end_time: float,
    before_expr: Any,
    final_value: float,
) -> Any:
    """
    Build one linear ramp transition on top of an existing runtime expression.

    The expression keeps ``before_expr`` before ``start_time``, then transitions
    linearly toward ``final_value`` until ``end_time``, and finally holds the
    final value afterwards.

    :param time_var: Global symbolic time variable.
    :param start_time: Ramp start time.
    :param end_time: Ramp end time.
    :param before_expr: Expression active before the ramp starts.
    :param final_value: Final runtime value after the ramp ends.
    :return: Combined symbolic expression.
    """
    start_expr: Any = _freeze_runtime_expr_at_time(before_expr, time_var, start_time)
    duration_expr: Const = Const(float(end_time - start_time))
    time_offset_expr: Any = time_var - Const(float(start_time))
    progress_expr: Any = hard_sat(time_offset_expr / duration_expr, Const(0.0), Const(1.0))
    ramp_expr: Any = start_expr + progress_expr * (Const(float(final_value)) - start_expr)
    started_expr: Any = heaviside(time_var - Const(float(start_time)))
    ended_expr: Any = heaviside(time_var - Const(float(end_time)))
    return (Const(1.0) - started_expr) * before_expr + started_expr * (
        (Const(1.0) - ended_expr) * ramp_expr + ended_expr * Const(float(final_value))
    )


def _xfmr_phase_permutation_matrix(clock: int) -> np.ndarray:
    """
    Return the phase permutation matrix implied by the transformer vector group.

    :param clock: Transformer vector group clock number.
    :return: 3x3 phase permutation matrix.
    """
    shift: int = (int(clock) // 4) % 3
    if shift == 0:
        return np.eye(3, dtype=float)
    else:
        if shift == 1:
            return np.array([
                [0.0, 1.0, 0.0],
                [0.0, 0.0, 1.0],
                [1.0, 0.0, 0.0],
            ], dtype=float)
        else:
            return np.array([
                [0.0, 0.0, 1.0],
                [1.0, 0.0, 0.0],
                [0.0, 1.0, 0.0],
            ], dtype=float)

[docs] def find_name_in_block(name: str, block: Block): for var in block.algebraic_vars + block.state_vars + list(block.event_dict.keys()) + block.diff_vars: if name == var.name: return var for mdl in block.children: res = find_name_in_block(name, mdl) if res is not None: return res
[docs] class EmtTopologyError(Exception): """ Exception raised when EMT topology validation fails. This includes orphaned in_vars/out_vars and floating bus phases. """ def __init__(self, message: str) -> None: """ :param message: Descriptive error message indicating the topology issue. """ self.message = message super().__init__(self.message)
[docs] class EmtInterfaceValidationError(ValueError): """ Exception raised when one EMT interface is incompatible with the bus shell. The dynamic editor may expose a maximum editable interface that is broader than the currently configured static network. This exception reports those mismatches explicitly before EMT assembly can silently skip substitutions. """ def __init__(self, message: str): """ Build one explicit EMT interface-validation error. :param message: Descriptive interface mismatch report. :return: None. """ self.message = message super().__init__(self.message)
def _unique_keep_order(seq: List[Var]) -> List[Var]: """ Remove duplicated variables by UID while preserving the first appearance order. :param seq: Input variable sequence. :return: Deduplicated variable sequence. """ seen: Set[int] = set() out: List[Var] = list() for item in seq: if item.uid in seen: pass else: seen.add(item.uid) out.append(item) return out def _deduplicate_block_entities(block: Block)->None: """ Removes duplicated symbolic objects within the block by UID while preserving their first appearance order. """ block.state_vars = _unique_keep_order(block.state_vars) block.algebraic_vars = _unique_keep_order(block.algebraic_vars) block.diff_vars = _unique_keep_order(block.diff_vars) def _get_mode_event_sort_key(event: Tuple[float, float, bool]) -> float: """ Return the sorting key of a mode event. :param event: Mode event tuple (time, value, force_step_alignment). :return: Event time. """ return event[0] def _is_time_aligned(t_curr: float, event_time: float) -> bool: """ Return whether the current solver time is aligned with the event time. :param t_curr: Current solver time. :param event_time: Scheduled event time. :return: True if the current time is aligned with the event time. """ machine_eps: float = float(np.finfo(np.float64).eps) time_tol: float = 10.0 * machine_eps * max(1.0, abs(event_time)) return bool(abs(t_curr - event_time) <= time_tol) def _get_next_forced_mode_event_time( scheduled_mode_events: Dict[int, List[Tuple[float, float, bool]]], t_prev: float, t_target: float, ) -> Optional[float]: """ Return the earliest forced-alignment mode event time in the interval (t_prev, t_target]. :param t_prev: Previous solver time. :param t_target: Nominal target time. :return: Earliest forced event time or None. """ next_time: Optional[float] = None for _, event_list in scheduled_mode_events.items(): event_idx: int = 0 while event_idx < len(event_list): event_time: float = event_list[event_idx][0] force_step_alignment: bool = event_list[event_idx][2] if force_step_alignment and (t_prev < event_time <= t_target): if next_time is None: next_time = event_time else: if event_time < next_time: next_time = event_time else: pass else: pass event_idx += 1 return next_time def _continuous_event_spec_time_sort_key(event_spec: Dict[str, float | str | None]) -> float: """ Return the sorting key used for one continuous-event specification. :param event_spec: Continuous event specification dictionary. :return: Event start time. """ return float(event_spec["time"]) def _get_block_name(mdl: Block) -> str: """ Return the block name used for logging. :param mdl: Block to inspect. :return: Block name. """ if mdl.name is None: return "unknown" else: return str(mdl.name) def _is_history_runtime_line_block_name(block_name: Any) -> Tuple[bool, bool]: """ Return whether one EMT line block uses a retained-history runtime companion. :param block_name: EMT block name. :return: Tuple ``(is_bergeron, is_jmarti)``. """ is_bergeron: bool = block_name == EmtLineTypes.Bergeron or block_name == EmtLineTypes.Bergeron.value is_jmarti: bool = block_name == EmtLineTypes.J_Marti or block_name == EmtLineTypes.J_Marti.value return is_bergeron, is_jmarti def _has_internal_grounding_link(block: Block) -> bool: """ Return whether one EMT block hierarchy contains one internal grounding link. :param block: Candidate EMT block. :return: Boolean state. """ diagram_node: Any nested_block: Block for diagram_node in block.diagram.node_data.values(): if diagram_node.tpe in {"GROUNDING_LINK_EMT", "GROUND_EMT"}: return True else: pass for nested_block in block.children: if nested_block.name.endswith("_grounding_link"): return True elif _has_internal_grounding_link(nested_block): return True else: pass return False def _get_grid_runtime_events(grid: MultiCircuit) -> List[Any]: """ Return the list of scheduled runtime events available in the grid. :param grid: MultiCircuit instance. :return: List of scheduled runtime events. """ return grid.emt_events def _get_bus_v_list( bus_block: Block, ph_v_keys: List[VarPowerFlowReferenceType] ) -> List[Any]: """ Return the ordered list of bus phase voltages, using None for missing phases. :param bus_block: Bus symbolic block. :param ph_v_keys: Ordered list of phase voltage keys. :return: Ordered list of bus phase voltage expressions or None. """ out: List[Any] = list() for key in ph_v_keys: vk: Optional[Any] = _get_external_mapping_var_if_present(mdl=bus_block, key=key) if vk is None: out.append(None) else: out.append(vk) return out def _get_external_mapping(mdl: Block) -> Optional[Dict[Any, Var]]: """ Return the flattened external mapping exposed by one block hierarchy. Wrapper EMT models may keep the actual PF-to-EMT variable bindings inside nested child blocks. The build stage therefore needs to inspect the whole hierarchy and merge the first occurrence of each key so later lookup sites can recover child initialization references from the wrapper root. :param mdl: Root block to inspect. :return: Flattened external mapping, or ``None`` when no mapping exists. """ external_mapping: Dict[Any, Var] = _collect_external_mapping_from_block(mdl) # An empty merged mapping means the hierarchy does not expose PF bindings. if len(external_mapping) == 0: return None else: return external_mapping def _collect_block_hierarchy(mdl: Block) -> List[Block]: """ Collect every block reachable from one root block. The EMT editor may persist one wrapper root around the actual device block. Rebuilding wrapper-owned metadata therefore starts by traversing the full hierarchy exactly once while avoiding revisiting the same block object. :param mdl: Root block to inspect. :return: Ordered list with the reachable block objects. """ blocks: List[Block] = list() stack: List[Block] = list([mdl]) visited: Set[int] = set() # The explicit stack keeps the traversal iterative and avoids recursion # depth concerns when the GUI creates nested wrapper structures. while len(stack) > 0: block: Block = stack.pop() block_id: int = id(block) # Every block is processed only once so merged metadata stays stable. if block_id not in visited: visited.add(block_id) blocks.append(block) # Child blocks are pushed so the caller can inspect nested metadata # such as PF references and explicit-init registrations. child: Block for child in block.children: stack.append(child) else: pass return blocks def _collect_external_mapping_from_block(mdl: Block) -> Dict[Any, Var]: """ Collect the external mapping from one block hierarchy. Wrapper models may keep the exposed PF reference variables in nested child blocks. This helper merges the hierarchy into one root-level mapping while preserving the first occurrence of each key, which matches the existing API-object mapping recovery strategy used by the EMT builder. :param mdl: Root block to inspect. :return: Flattened external mapping. """ mapping: Dict[Any, Var] = dict() blocks: List[Block] = _collect_block_hierarchy(mdl) block: Block # The root block keeps precedence, while child-only keys are recovered so # PF-derived initialization metadata survives wrapper reconstruction. for block in blocks: local_mapping: Optional[Dict[Any, Var]] = block.external_mapping if local_mapping is None: pass else: key: Any value: Var for key, value in local_mapping.items(): if key not in mapping: mapping[key] = value else: pass mapping = _sanitize_external_mapping_for_explicit_dc_terminals(mapping) return mapping def _sanitize_external_mapping_for_explicit_dc_terminals(mapping: Dict[Any, Var]) -> Dict[Any, Var]: """ Remove ambiguous shared DC aliases when explicit terminal mappings exist. GUI-loaded EMT wrappers can preserve both the legacy shared DC aliases (``Vdc`` / ``Idc``) and the explicit terminal contract (``Vf_dc`` / ``Vt_dc`` and ``If_dc`` / ``It_dc``). The structural EMT assembly for two-sided DC devices is defined by the explicit side-specific variables. Keeping the shared aliases at the merged root can reintroduce one-sided shortcuts that collapse the assembled sparse system and make the Jacobian exactly singular. :param mapping: Flattened external mapping collected from one block hierarchy. :return: Sanitized mapping that preserves only the explicit two-sided DC contract. """ sanitized_mapping: Dict[Any, Var] = dict() has_vf: bool = VarPowerFlowReferenceType.Vf_dc in mapping has_vt: bool = VarPowerFlowReferenceType.Vt_dc in mapping has_if: bool = VarPowerFlowReferenceType.If_dc in mapping has_it: bool = VarPowerFlowReferenceType.It_dc in mapping key: Any value: Var for key, value in mapping.items(): if key == VarPowerFlowReferenceType.Vdc: if has_vf and has_vt: pass else: sanitized_mapping[key] = value elif key == VarPowerFlowReferenceType.Idc: if has_if and has_it: pass else: sanitized_mapping[key] = value else: sanitized_mapping[key] = value return sanitized_mapping def _collect_api_obj_mapping_from_block(mdl: Block) -> Dict[ParamPowerFlowReferenceType, Any]: """ Collect the API-object mapping from one block hierarchy. Wrapper models may keep the parameter mapping in nested child blocks. This helper traverses the hierarchy and preserves the first mapping occurrence of each key so PF-derived parameter assignment still sees the expected metadata. :param mdl: Root block to inspect. :return: Flattened API-object mapping. """ mapping: Dict[ParamPowerFlowReferenceType, Any] = dict() blocks: List[Block] = _collect_block_hierarchy(mdl) block: Block # The same hierarchy walk used for external mappings is reused here so the # wrapper-recovery policy stays identical for all PF initialization data. for block in blocks: local_mapping: Dict[ParamPowerFlowReferenceType, Any] = block.api_obj_mapping key: ParamPowerFlowReferenceType value: Any for key, value in local_mapping.items(): if key not in mapping: mapping[key] = value else: pass return mapping def _realign_static_parameter_mapping_with_unified_block( sys_block: Block, static_parameter_values_mapping: Dict[Var, Const], ) -> None: """ Rebind static parameter values to the vars present in the unified root block. EMT compilation uses the final flattened ``sys_block`` equations. Static parameters are collected earlier while device models may still live in wrapper hierarchies. After unification and deduplication, the final equation graph can reference var instances that share the same UID but are not the same Python object as the ones used as keys in the pre-collected static mapping. This helper rebuilds the dictionary on top of the vars that actually belong to the unified block so compiler-name generation stays aligned with the final system. :param sys_block: Unified EMT root block. :param static_parameter_values_mapping: Static parameter mapping to realign. :return: None. """ uid_to_parameter_var: Dict[int, Var] = dict() parameter_var: Var # Some custom EMT templates keep parameter variables only in nested blocks. # After block unification the final compiled equations may still reference # those vars by UID even when the root ``sys_block.parameters`` dictionary is # not the original object that first held them. Walk the full hierarchy to # realign every constant parameter against the vars that survive in the # unified graph. all_blocks: List[Block] = sys_block.get_all_blocks() block_obj: Block parameter_value: Const for block_obj in all_blocks: for parameter_var, parameter_value in block_obj.parameters.items(): if parameter_var not in static_parameter_values_mapping: static_parameter_values_mapping[parameter_var] = parameter_value else: pass uid_to_parameter_var[parameter_var.uid] = parameter_var if len(uid_to_parameter_var) == 0: return else: pass updated_mapping: Dict[Var, Const] = dict() original_parameter_var: Var parameter_value: Const for original_parameter_var, parameter_value in static_parameter_values_mapping.items(): unified_parameter_var: Var | None = uid_to_parameter_var.get(original_parameter_var.uid, None) if unified_parameter_var is None: updated_mapping[original_parameter_var] = parameter_value else: updated_mapping[unified_parameter_var] = parameter_value static_parameter_values_mapping.clear() static_parameter_values_mapping.update(updated_mapping) def _build_device_idtag_lookup(devices: List[Any]) -> Dict[str, int]: """ Build one idtag-to-index lookup table. This lookup is acceptable because it is used only for indexing and not as the primary numerical storage container. :param devices: Device list. :return: Lookup dictionary. """ lookup: Dict[str, int] = dict() device_index: int = 0 while device_index < len(devices): lookup[devices[device_index].idtag] = device_index device_index += 1 return lookup def _get_source_seed_weight(injection: Any) -> float: """ Return the weight used to distribute bus residual among source-like injections. ``Snom`` is used whenever it is positive. Otherwise, a unit weight is used. :param injection: Injection device. :return: Residual-allocation weight. """ snom_value: float = float(injection.Snom) if snom_value > 0.0: return snom_value else: return 1.0 def _get_internal_runtime_default(mdl: Block, var_name: str, default_value: float) -> float: """ Return one runtime-parameter default by internal variable name. :param mdl: Root model block. :param var_name: Runtime-parameter variable name. :param default_value: Fallback value if the variable is not found. :return: Runtime-parameter default. """ internal_var: Optional[Var] owner_block: Optional[Block] expression: Any internal_var, owner_block = _find_internal_runtime_var_owner(mdl, var_name) if internal_var is None or owner_block is None: return float(default_value) else: pass expression = owner_block.event_dict.get(internal_var, None) if expression is None: return float(default_value) else: pass if isinstance(expression, Const): return float(expression.value) else: return float(default_value) def _get_internal_mode_default(mdl: Block, var_name: str, default_value: float) -> float: """ Return one retained runtime-mode default by internal variable name. :param mdl: Root model block. :param var_name: Retained mode variable name. :param default_value: Fallback value if the variable is not found. :return: Runtime-mode default. """ internal_var: Optional[Var] owner_block: Optional[Block] expression: Any internal_var, owner_block = _find_mode_var_owner(mdl, var_name) if internal_var is None or owner_block is None: return float(default_value) else: pass expression = owner_block.mode_dict.get(internal_var, None) if expression is None: return float(default_value) else: pass if isinstance(expression, Const): return float(expression.value) else: return float(default_value) def _resolve_converter_control_reference_values(control1_code: float, control2_code: float, control1_val: float, control2_val: float, p0_value: float) -> Tuple[float, float, float]: """ Resolve converter control references numerically from configured control modes. :param control1_code: Numeric code of control 1. :param control2_code: Numeric code of control 2. :param control1_val: Numeric value of control 1. :param control2_val: Numeric value of control 2. :param p0_value: Scheduled active-power operating point. :return: Tuple ``(P_ref, Q_ref, Vdc_ref)``. """ vm_dc_code: float = 1.0 qac_code: float = 4.0 pdc_code: float = 5.0 pac_code: float = 6.0 p_ref_value: float q_ref_value: float vdc_ref_value: float if abs(control1_code - vm_dc_code) < 0.5: vdc_ref_value = control1_val else: if abs(control2_code - vm_dc_code) < 0.5: vdc_ref_value = control2_val else: vdc_ref_value = 1.0 if abs(control1_code - qac_code) < 0.5: q_ref_value = control1_val else: if abs(control2_code - qac_code) < 0.5: q_ref_value = control2_val else: q_ref_value = 0.0 if abs(control1_code - pdc_code) < 0.5 or abs(control1_code - pac_code) < 0.5: p_ref_value = control1_val else: if abs(control2_code - pdc_code) < 0.5 or abs(control2_code - pac_code) < 0.5: p_ref_value = control2_val else: p_ref_value = p0_value return float(p_ref_value), float(q_ref_value), float(vdc_ref_value) def _get_external_mapping_var_if_present( mdl: Block, key: VarPowerFlowReferenceType, ) -> Optional[Any]: """ Return one mapped external variable if the model exposes the key. This helper intentionally does not call ``mdl.E(key)`` because ``Block.E`` indexes ``external_mapping`` directly and raises ``KeyError`` when the key is absent. User models are allowed to expose only a subset of optional PowerFlow initialization keys. :param mdl: Symbolic device block. :param key: External mapping reference key. :return: Mapped variable or expression, or ``None``. """ external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) if external_mapping is None: return None else: mapped_var: Optional[Any] = external_mapping.get(key, None) return mapped_var def _get_array_entry_if_in_range(array_obj: Any, index: int) -> Optional[Any]: """ Return one array entry only when the index is valid. Power-flow result containers may exist while some per-phase arrays are empty, for example after one failed or unsupported PF attempt in the GUI. EMT should then skip the corresponding seed instead of raising ``IndexError`` during the initialization pass. :param array_obj: Candidate array-like object. :param index: Requested zero-based index. :return: Selected entry or ``None``. """ if array_obj is None: return None else: pass try: array_length: int = len(array_obj) except TypeError: return None if 0 <= int(index) < array_length: return array_obj[index] else: return None def _get_balanced_vsc_dc_power(pf_results: PowerFlowResults, vsc_index: int) -> complex: """ Return the balanced VSC DC-side active power in system-base units. :param pf_results: Balanced power-flow results. :param vsc_index: VSC index. :return: DC-side complex power. """ pfp_value: Optional[Any] = _get_array_entry_if_in_range(pf_results.Pfp_vsc, vsc_index) pfn_value: Optional[Any] = _get_array_entry_if_in_range(pf_results.Pfn_vsc, vsc_index) pfp_float: float = 0.0 if pfp_value is None else float(pfp_value) pfn_float: float = 0.0 if pfn_value is None else float(pfn_value) return complex(pfp_float + pfn_float, 0.0) def _get_balanced_vsc_ac_power(pf_results: PowerFlowResults, vsc_index: int) -> complex: """ Return the balanced VSC AC-side complex power in system-base units. Some PF result containers still leave ``St_vsc`` at zero for valid AC/DC operating points. In that case, use the available DC-side transfer as the active-power fallback so EMT initialization preserves the scheduled power. :param pf_results: Balanced power-flow results. :param vsc_index: VSC index. :return: AC-side complex power. """ st_value: Optional[Any] = _get_array_entry_if_in_range(pf_results.St_vsc, vsc_index) st_complex: complex if st_value is None: return _get_balanced_vsc_dc_power(pf_results, vsc_index) else: st_complex = complex(st_value) if abs(st_complex) > 1.0e-12: return st_complex else: return _get_balanced_vsc_dc_power(pf_results, vsc_index) def _find_mode_var_by_name(block: Block, var_name: str) -> Optional[Var]: """ Return one retained mode variable from a block hierarchy by name. :param block: Root block to inspect. :param var_name: Retained mode variable name. :return: Matching retained mode variable or ``None``. """ mode_var: Var child_block: Block nested_mode_var: Optional[Var] for mode_var in block.mode_dict.keys(): if mode_var.name == var_name: return mode_var else: pass for child_block in block.children: nested_mode_var = _find_mode_var_by_name(child_block, var_name) if nested_mode_var is not None: return nested_mode_var else: pass return None def _find_internal_var_for_init(block: Block, var_name: str) -> Optional[Var]: """ Return one internal state/algebraic/differential variable by name. This helper intentionally ignores ``in_vars`` and ``out_vars`` aliases so initialization values are written against the actual solver-owned variables. :param block: Root block to inspect. :param var_name: Variable name. :return: Matching solver-owned variable or ``None``. """ variable: Var child_block: Block nested_variable: Optional[Var] for variable_list in [block.state_vars, block.algebraic_vars, block.diff_vars]: for variable in variable_list: if variable.name == var_name: return variable else: pass for child_block in block.children: nested_variable = _find_internal_var_for_init(child_block, var_name) if nested_variable is not None: return nested_variable else: pass return None def _find_internal_var_by_prefix(block: Block, name_prefix: str) -> Optional[Var]: """ Return one internal state/algebraic/differential variable by name prefix. This helper is used for templates whose cloned working blocks may not keep an easily reconstructable root-block name, while their internal variable prefixes remain stable. :param block: Root block to inspect. :param name_prefix: Variable-name prefix. :return: Matching solver-owned variable or ``None``. """ variable: Var child_block: Block nested_variable: Optional[Var] for variable_list in [block.state_vars, block.algebraic_vars, block.diff_vars]: for variable in variable_list: if variable.name.startswith(name_prefix): return variable else: pass for child_block in block.children: nested_variable = _find_internal_var_by_prefix(child_block, name_prefix) if nested_variable is not None: return nested_variable else: pass return None def _find_mode_var_owner(block: Block, var_name: str) -> Tuple[Optional[Var], Optional[Block]]: """ Return one retained mode variable together with the block that owns it. :param block: Root block to inspect. :param var_name: Retained mode variable name. :return: Tuple ``(variable, owner_block)`` or ``(None, None)``. """ mode_var: Var child_block: Block nested_var: Optional[Var] nested_owner: Optional[Block] for mode_var in block.mode_dict.keys(): if mode_var.name == var_name: return mode_var, block else: pass for child_block in block.children: nested_var, nested_owner = _find_mode_var_owner(child_block, var_name) if nested_var is not None and nested_owner is not None: return nested_var, nested_owner else: pass return None, None def _find_internal_runtime_var_owner(block: Block, var_name: str) -> Tuple[Optional[Var], Optional[Block]]: """ Return one runtime parameter variable together with the block that owns it. :param block: Root block to inspect. :param var_name: Runtime parameter variable name. :return: Tuple ``(variable, owner_block)`` or ``(None, None)``. """ variable: Var child_block: Block nested_var: Optional[Var] nested_owner: Optional[Block] for variable in block.event_dict.keys(): if variable.name == var_name: return variable, block else: pass for child_block in block.children: nested_var, nested_owner = _find_internal_runtime_var_owner(child_block, var_name) if nested_var is not None and nested_owner is not None: return nested_var, nested_owner else: pass return None, None def _find_event_var_owner(block: Block, target_var: Var) -> Optional[Block]: """ Return the block that owns one runtime event parameter variable. GUI-authored EMT models may keep the actual runtime parameter inside a child block while the wrapper root exposes only proxy metadata. Runtime seeding therefore has to locate the block that owns the parameter before updating the corresponding ``event_dict`` entry. :param block: Root block to inspect. :param target_var: Runtime parameter variable to locate. :return: Owner block or ``None`` when the variable is not present. """ blocks: List[Block] = _collect_block_hierarchy(block) block_item: Block # The hierarchy walk is explicit so wrapper roots and leaf models follow the # same ownership lookup path during PF-derived runtime seeding. for block_item in blocks: if target_var in block_item.event_dict: return block_item else: pass return None def _get_expected_pi_line_terminal_refs(ph_mask: List[bool]) -> List[VarPowerFlowReferenceType]: """ Build the ordered terminal-voltage references for the active pi-line phases. :param ph_mask: Physical line phase mask in NABC order. :return: Ordered ``vf_*`` and ``vt_*`` references for the active phases only. """ vf_refs: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.vf_N, VarPowerFlowReferenceType.vf_A, VarPowerFlowReferenceType.vf_B, VarPowerFlowReferenceType.vf_C, ]) vt_refs: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.vt_N, VarPowerFlowReferenceType.vt_A, VarPowerFlowReferenceType.vt_B, VarPowerFlowReferenceType.vt_C, ]) ordered_refs: List[VarPowerFlowReferenceType] = list() for idx, is_active in enumerate(ph_mask): if is_active: ordered_refs.append(vf_refs[idx]) else: pass for idx, is_active in enumerate(ph_mask): if is_active: ordered_refs.append(vt_refs[idx]) else: pass return ordered_refs
[docs] def validate_line_phase_layout(branch: Any, mdl: Block, logger: Logger): """ Validate that a pi-line EMT block matches the physical branch phase layout. The EMT parameter mapper writes reduced branch matrices into the fixed NABC API map using ``branch.ys``. This helper therefore checks that the symbolic pi-line terminal layout still matches the physical branch phase mask before the device is flattened into the system model. A mismatch is a topology error, so the function reports it through the shared logger and preserves the original model instead of silently rebuilding it. :param branch: Grid branch device. :param mdl: EMT block associated with the branch. :param logger: Shared system logger used to report invalid topology states. """ # The branch admittance mask is the physical topology source of truth, so it # defines which symbolic terminal voltage references must exist on the block. ph_mask: List[bool] = list([ bool(branch.ys.phN), bool(branch.ys.phA), bool(branch.ys.phB), bool(branch.ys.phC), ]) expected_refs: List[VarPowerFlowReferenceType] = _get_expected_pi_line_terminal_refs(ph_mask) # Only bus-terminal voltage references participate in the topology check. # Other inputs can remain on the block without affecting phase consistency. tracked_refs: Set[VarPowerFlowReferenceType] = { VarPowerFlowReferenceType.vf_N, VarPowerFlowReferenceType.vf_A, VarPowerFlowReferenceType.vf_B, VarPowerFlowReferenceType.vf_C, VarPowerFlowReferenceType.vt_N, VarPowerFlowReferenceType.vt_A, VarPowerFlowReferenceType.vt_B, VarPowerFlowReferenceType.vt_C, } current_refs: List[VarPowerFlowReferenceType] = list() for in_var in mdl.in_vars: if in_var.ref in tracked_refs: current_refs.append(in_var.ref) else: pass # A mismatch means the symbolic template and the EMT topology disagree about # how many branch terminals exist. That state is invalid because later model # assembly would bind non-existent phases, so it must be surfaced explicitly. if current_refs != expected_refs: logger.add_error( msg="Pi-line template has a different number of phases than the EMT model (topology mismatch).", device=branch.name, device_class=str(branch.device_type), value=str(current_refs), expected_value=str(expected_refs), ) else: pass
[docs] class EmtInjectionSeedStore: """ Fixed-size container storing PF-derived initialization seeds for EMT injections. The store is indexed by the position of each injection in the pre-built ``injection_devices`` list of ``EmtProblemDae._build_structure_and_collect``. This avoids dynamic dictionaries for numerical data and keeps memory layout explicit and preallocated. :param n_injections: Number of injection devices in the EMT problem. """ __slots__ = ( "_ac_power", "_dc_power", "_has_seed", ) def __init__(self, n_injections: int) -> None: """ Build the fixed-size seed store. :param n_injections: Number of injection devices. :return: None. """ # The AC seed stores one complex power per NABC phase for each injection. self._ac_power: np.ndarray = np.zeros((n_injections, 4), dtype=np.complex128) # The DC seed stores one real active power per injection. self._dc_power: np.ndarray = np.zeros(n_injections, dtype=np.float64) # This mask tells whether an injection received an explicit seed. self._has_seed: np.ndarray = np.zeros(n_injections, dtype=bool)
[docs] def set_ac_seed(self, injection_index: int, phase_power: np.ndarray) -> None: """ Store one AC four-phase power seed. :param injection_index: Injection position in the fixed injection list. :param phase_power: Complex power vector in NABC order. :return: None. """ self._ac_power[injection_index, :] = phase_power self._has_seed[injection_index] = True
[docs] def set_dc_seed(self, injection_index: int, power_value: float) -> None: """ Store one DC active-power seed. :param injection_index: Injection position in the fixed injection list. :param power_value: DC active power in per unit. :return: None. """ self._dc_power[injection_index] = power_value self._has_seed[injection_index] = True
[docs] def has_seed(self, injection_index: int) -> bool: """ Return whether the given injection has a stored seed. :param injection_index: Injection position in the fixed injection list. :return: ``True`` if a seed exists, ``False`` otherwise. """ return bool(self._has_seed[injection_index])
[docs] def get_ac_seed(self, injection_index: int) -> np.ndarray: """ Return the AC phase-power seed in NABC order. :param injection_index: Injection position in the fixed injection list. :return: Complex power vector in NABC order. """ return self._ac_power[injection_index, :]
[docs] def get_dc_seed(self, injection_index: int) -> float: """ Return the DC active-power seed. :param injection_index: Injection position in the fixed injection list. :return: DC active power in per unit. """ return float(self._dc_power[injection_index])
def _accumulate_kcl_current( i_kcl: Dict[int, List[Any]], bus_index: int, phase_index: int, current_expression: Any, sign: float, ) -> None: """ Accumulate one symbolic current contribution into the bus KCL. The EMT nodal equations are KCL equations. This helper keeps the sign convention explicit: - ``sign = -1.0`` for branch currents leaving the bus, - ``sign = +1.0`` for injection currents entering the bus. :param i_kcl: Bus-phase KCL symbolic accumulator. :param bus_index: Bus index. :param phase_index: Phase index in NABC order. :param current_expression: Symbolic current contribution. :param sign: Contribution sign. :return: None. """ if current_expression is None: pass else: i_kcl[bus_index][phase_index] = i_kcl[bus_index][phase_index] + sign * current_expression def _zero_ac_power_vector() -> np.ndarray: """ Return a zero complex power vector in NABC order. :return: Zero complex power vector. """ return np.zeros(4, dtype=np.complex128) def _phase_sequence_shift(phase_index: int) -> complex: """ Return the phase shift used to reconstruct balanced phase phasors. Phase order is NABC. :param phase_index: Phase index in NABC order. :return: Complex unit phasor applied to the balanced bus phasor. """ if phase_index == 0: return 0.0 + 0.0j else: if phase_index == 1: return 1.0 + 0.0j else: if phase_index == 2: return np.exp(-1j * 2.0 * np.pi / 3.0) else: return np.exp(+1j * 2.0 * np.pi / 3.0)
[docs] class EmtProblemDae(EmtProblemTemplate): """ Electrical parser layer for the EMT DAE Problem. Responsibilities: - Traverses the MultiCircuit to extract electrical devices. - Constructs nodal KCL balances per phase. - Processes external piecewise events into the unified block. - Maintains mapping between physical devices and symbolic variables. - Delegates all mathematical and indexing plumbing to BaseProblem. - Initializes PF-based guesses AND evaluates mdl.init_eqs explicitly (Explicit init). """ def __init__(self, grid: MultiCircuit, options: EmtOptions, pf_results_3Ph: PowerFlowResults3Ph | None = None, pf_results_3ph: PowerFlowResults3Ph | None = None, pf_results: PowerFlowResults | None = None, progress_signal: DummySignal | None = None, progress_text: DummySignal | None = None) -> None: """ Initializes the EMT Problem by parsing the grid and passing the resulting mathematical block to the BaseProblem constructor. :param grid: :param options: :param pf_results_3Ph: :param pf_results: """ self.logger = Logger() self.grid = grid self.options = options if pf_results_3ph is None: self.power_flow_results_3ph = pf_results_3Ph else: self.power_flow_results_3ph = pf_results_3ph self.power_flow_results = pf_results self._vars_info: Dict[ALL_DEV_TYPES, List[Var]] = dict() self._vars_glob_name2uid: Dict[str, int] = dict() self._vars2device: Dict[int, ALL_DEV_TYPES] = dict() self._temp_init_guess: Dict[int, float] = dict() self._temp_post_init_guess: Dict[int, float] = dict() self._temp_diff_init_guess: Dict[int, float] = dict() self._temp_post_diff_init_guess: Dict[int, float] = dict() self.initialization_report: EmtInitializationReport | None = None self.build_report: Dict[str, float] = dict() self._scheduled_mode_events: Dict[int, List[Tuple[float, float, bool]]] = dict() self._mode_event_cursor: Dict[int, int] = dict() self._runtime_parameter_eqs0: List[Any] = list() self._event_parameter_device_idtags: Dict[int, str] = dict() self._event_parameter_name_lookup: Dict[Tuple[str, str], int] = dict() self._continuous_event_parameter_uids: Set[int] = set() self._discrete_event_parameter_uids: Set[int] = set() self._active_events_group: EmtEventsGroup | None = None static_parameter_values_mapping: Dict[Var, Const] = dict() # The explicit initialization stage must solve each device working model as # one unified symbolic block. Registering nested child blocks separately can # break coupled controller/plant operating-point seeds because sibling # sub-blocks are then initialized independently instead of sharing one # flattened dependency graph. self._device_models_with_init_eqs: List[Block] = list() self._device_models_with_init_eqs_seen: Set[int] = set() sys_block = Block(children=[], in_vars=[]) glob_time = Var(self.TIME_NAME) self.history_models: List[BergeronHistoryRuntime | JMartiHistoryRuntime] = list() self._block_boundary_updater: Any | None = None self.step_counter: int = 0 self._fmu_cs_adapters: List[Any] = list() self._pending_fmu_cs_devices: List[Tuple[Any, Block]] = list() self._fmu_me_adapters: List[Any] = list() self._pending_fmu_me_devices: List[Tuple[Any, Block]] = list() t_build = _tic() t_phase_start: float = t_build cache_stats_before: Dict[str, float] = get_compiled_functions_cache_stats() validate_models_s: float = _toc(t_phase_start) t_phase_start = _tic() self._validate_dynamic_emt_interfaces() validate_interfaces_s: float = _toc(t_phase_start) t_phase_start = _tic() self._working_emt_models: Dict[str, Block] = dict() self._registered_working_emt_models: Set[str] = set() # build on local sys_block self._build_structure_and_collect(sys_block=sys_block, grid=grid, static_parameter_values_mapping=static_parameter_values_mapping ) structure_collect_s: float = _toc(t_phase_start) t_phase_start = _tic() sys_block.unify_blocks() _deduplicate_block_entities(sys_block) # The EMT problem compiles the unified root block, so the static-parameter # table must be realigned with the final flattened symbolic graph before # compiler names are generated. Wrapper, GUI and cloned workflows can keep # API-mapped parameter vars only in child blocks during collection, and the # subsequent unification step can then leave the pre-collected parameter map # out of sync with the actual vars referenced by the final equations. _realign_static_parameter_mapping_with_unified_block( sys_block=sys_block, static_parameter_values_mapping=static_parameter_values_mapping, ) unify_blocks_s: float = _toc(t_phase_start) t_phase_start = _tic() self._validate_connections() validate_connections_s: float = _toc(t_phase_start) t_phase_start = _tic() # initialize template with the complete block super().__init__(sys_block=sys_block, static_parameter_values_mapping=static_parameter_values_mapping, glob_time=glob_time, progress_signal=progress_signal, progress_text=progress_text) self._rebuild_results_var_name_lookup() template_init_s: float = _toc(t_phase_start) t_phase_start = _tic() # Mark retained runtime parameters: # - scheduled mode parameters # - Bergeron history parameters mode_parameters: List[Var] = self._collect_runtime_mode_parameters() if len(mode_parameters) > 0: self.set_runtime_mode_parameters(mode_parameters) else: pass # Runtime parameter indices may have changed after repartitioning, # so Bergeron runtimes must bind their parameter indices afterwards. for rt in self.history_models: rt.setup_indices( uid2idx_vars=self.uid2idx_vars, uid2idx_event_params=self.uid2idx_event_params, params_offset=0, ) # The retained runtime snapshot must follow the finalized flat runtime # vector order, not the original block-discovery order. Bergeron history # parameters are classified as retained mode parameters and moved to the # tail of the runtime vector, so preserving the already partitioned event # equation list keeps later event-group activation aligned with # ``uid2idx_event_params``. self._runtime_parameter_eqs0 = list(self._event_parameters_eqs) finalize_emt_fmu_cs_devices(self) finalize_emt_fmu_me_devices(self) self._block_boundary_updater = build_boundary_updater_from_block(self) if len(self._runtime_mode_parameters) > 0: self._initialize_mode_event_state() else: pass runtime_partition_s: float = _toc(t_phase_start) t_phase_start = _tic() # 1) PF guesses self.init_guess.update(self._temp_init_guess) self.diff_init_guess.update(self._temp_diff_init_guess) if self.progress_signal is not None: self.progress_signal.emit(5) # 2) Initialization stage selection. # Explicit initialization is the lightweight symbolic seed stage that # resolves device-provided init equations directly. The broader EMT # workflow may either stop here, use it as a seed for the reduced # native consistency solve, or skip it entirely when the user # requests a pure pseudo-transient initialization path driven only by # the power-flow seeded variables. if self.options.initialization_method == EmtInitializationMethod.PseudoTransient: pass else: self._run_explicit_initialization() explicit_init_s: float = _toc(t_phase_start) t_phase_start = _tic() # 3) Native initialization stage selection. # The native stage builds and solves the reduced consistency system only # for workflows that require it. Pure Explicit stops after the symbolic # seeding stage, while the other methods use the native initialization # dispatcher to execute their requested algorithm from the current seed. if self.options.initialization_method == EmtInitializationMethod.Explicit: self.initialization_report = EmtInitializationReport(method_requested=EmtInitializationMethod.Explicit) self.initialization_report.status = EmtInitializationStatus.RESOLVED self.initialization_report.method_used = EmtInitializationMethod.Explicit self.initialization_report.message = "Explicit initialization completed." else: self.initialization_report = run_emt_native_initialization(problem=self, options=self.options) self._seed_all_switched_vsc_models() self._seed_all_switch_models() self.init_guess.update(self._temp_post_init_guess) self.diff_init_guess.update(self._temp_post_diff_init_guess) # for diff_uid, diff_value in self._temp_post_diff_init_guess.items(): # if diff_uid in self.diff_init_guess: # pass # else: # self.diff_init_guess[diff_uid] = float(diff_value) native_init_s: float = _toc(t_phase_start) t_phase_start = _tic() if self.progress_signal is not None: self.progress_signal.emit(10) # EMT event groups are activated only after the initialization stage. # Some runtime parameters start as Const(None) placeholders and are # resolved during explicit/native initialization. Wrapping them in # piecewise expressions before that would preserve the undefined default # and break the initial runtime-parameter evaluation. self.set_events_group(None) if self.initialization_report is None: pass else: _compute_missing_dx0( problem=self, report=self.initialization_report, x_full=self.get_x0(), dx_full=self.get_dx0(), runtime_params=self.event_params_values.copy(), constant_params=np.asarray( [float(const.value) for const in self.get_parameters_values()], dtype=np.float64, ), include_existing=False, ) events_group_finalize_s: float = _toc(t_phase_start) t_done = _toc(t_build) cache_stats_after: Dict[str, float] = get_compiled_functions_cache_stats() compiled_function_cache_delta: float = cache_stats_after.get("entry_count", 0.0) - cache_stats_before.get("entry_count", 0.0) native_structural_cold_s: float native_numeric_refresh_s: float if self.initialization_report is None: native_structural_cold_s = 0.0 native_numeric_refresh_s = 0.0 else: native_structural_cold_s = float(self.initialization_report.reduced_system_build_s) native_numeric_refresh_s = ( float(self.initialization_report.runtime_param_build_s) + float(self.initialization_report.constant_param_build_s) + float(self.initialization_report.initial_residual_eval_s) + float(self.initialization_report.newton_elapsed_s) + float(self.initialization_report.pseudo_transient_elapsed_s) + float(self.initialization_report.dx0_completion_s) ) structural_cold_s: float = ( validate_models_s + structure_collect_s + unify_blocks_s + validate_connections_s + template_init_s + runtime_partition_s + explicit_init_s + native_structural_cold_s ) numeric_refresh_s: float = native_numeric_refresh_s + events_group_finalize_s self.build_report = dict( validate_models_s=validate_models_s, structure_collect_s=structure_collect_s, unify_blocks_s=unify_blocks_s, validate_connections_s=validate_connections_s, template_init_s=template_init_s, runtime_partition_s=runtime_partition_s, explicit_init_s=explicit_init_s, native_init_s=native_init_s, events_group_finalize_s=events_group_finalize_s, structural_cold_s=structural_cold_s, numeric_refresh_s=numeric_refresh_s, native_structural_cold_s=native_structural_cold_s, native_numeric_refresh_s=native_numeric_refresh_s, compiled_function_cache_delta=compiled_function_cache_delta, persistent_init_cache_hit=1.0 if (self.initialization_report is not None and self.initialization_report.persistent_cache_hit) else 0.0, persistent_init_cache_load_s=0.0 if self.initialization_report is None else float(self.initialization_report.persistent_cache_load_s), persistent_init_cache_store_s=0.0 if self.initialization_report is None else float(self.initialization_report.persistent_cache_store_s), total_s=t_done, ) if options.verbose > 0: self.logger.add_debug(f"init guess = {self.init_guess}") self.logger.add_debug(f"diff init guess = {self.diff_init_guess}") if self.initialization_report is None: pass else: self.logger.add_debug( f"EMT initialization used {self.initialization_report.method_used} | " f"status={self.initialization_report.status.name} | " f"res0={self.initialization_report.initial_residual_inf:.3e} | " f"resf={self.initialization_report.final_residual_inf:.3e} | " f"newton={self.initialization_report.newton_iterations} | " f"ptc={self.initialization_report.pseudo_transient_steps} | " f"auto_dx0={self.initialization_report.automatic_dx0_count}" ) self.logger.add_debug( f"EMT electrical problem parsed in {t_done:.4f}s | " f"vars={self._n_vars} (state={self._n_state}, alg={self._n_alg}) | " f"eqs(state={len(self._state_eqs)}, alg={len(self._algebraic_eqs)})" ) # --------------------------------------------------------------------- # init_eqs registration + explicit init execution # --------------------------------------------------------------------- def _device_block_has_init_equations(self, mdl: Block) -> bool: """ Return whether one unified device block contributes explicit init equations. The device working model is flattened through ``unify_blocks()`` before explicit initialization registration. After that stage, all child init equations that belong to the device hierarchy are merged into the device root itself. The explicit initializer should therefore treat the unified device block as the only initialization unit. :param mdl: Root block to inspect. :type mdl: Block :return: ``True`` when the device block has explicit init equations. :rtype: bool """ init_eqs: Dict[Var, Expr] = mdl.init_eqs diff_init_eqs: Dict[Var, Expr] = mdl.diff_init_eqs has_init_eqs: bool = len(init_eqs) > 0 has_diff_init_eqs: bool = len(diff_init_eqs) > 0 if has_init_eqs: return True else: if has_diff_init_eqs: return True else: return False def _register_init_model(self, mdl: Block) -> None: """ Register one unified device working model for explicit initialization. The broader EMT build process already connects the working model and then flattens it through ``unify_blocks()`` before this registration point. At that stage the device root owns the merged ``init_eqs`` / ``diff_init_eqs`` of its full hierarchy, so explicit initialization must run exactly once on that root block instead of iterating over stale child blocks. :param mdl: Unified device working model. :type mdl: Block :return: None. :rtype: None """ has_init_equations: bool = self._device_block_has_init_equations(mdl) if has_init_equations: if mdl.uid in self._device_models_with_init_eqs_seen: pass else: self._device_models_with_init_eqs_seen.add(mdl.uid) self._device_models_with_init_eqs.append(mdl) else: pass
[docs] def get_build_report(self) -> Dict[str, float]: """ Return the EMT problem build timing report. :return: Build timing report. :rtype: Dict[str, float] """ return dict(self.build_report)
def _run_explicit_initialization(self) -> None: """ Run explicit initialization once per unified device working model. The working-device model already contains the merged controller, plant, runtime parameter, and init-equation hierarchy after ``unify_blocks()``. Initializing the device root as one flat block preserves coupled operating points across sibling sub-blocks, which is required for EMT templates whose current references, delayed commands, and plant states must be seeded together. :return: None :rtype: None """ if len(self._device_models_with_init_eqs) == 0: return else: pass # The explicit solver works on the already assembled global variable maps, # but each call consumes one unified device block so sibling device states # remain decoupled while sibling controller/plant blocks inside the device # are initialized together. sys_vars: Dict[int, Var] = {v.uid: v for v in (self._state_vars + self._algebraic_vars)} sys_diff_vars: Dict[int, Var] = {dv.uid: dv for dv in self._diff_vars} compile_single_equation: Any = build_symbolic_vector_single_equation_compiler( compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, vars_name=self.VARS_NAME, diff_name=self.DIFF_NAME, event_params_name=self.VARIABLE_PARAMS_NAME, params_name=self.CONSTANT_PARAMS_NAME, ) seen_blocks: Set[int] = set() for mdl in self._device_models_with_init_eqs: if mdl.uid not in seen_blocks: seen_blocks.add(mdl.uid) try: params_array: np.ndarray = np.asarray( [float(const.value) for const in self.get_parameters_values()], dtype=np.float64, ) self.init_guess, self.diff_init_guess = init_explicit_common( mdl=mdl, sys_vars=sys_vars, sys_diff_vars=sys_diff_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs, constant_parameters=self._constant_parameters, event_param_init_dict=self.event_params_init_dict, init_guess=self.init_guess, diff_init_guess=self.diff_init_guess, uid2idx_vars=self.uid2idx_vars, uid2idx_diff=self.uid2idx_diff, uid2idx_params=self.uid2idx_params, uid2idx_event_params=self.uid2idx_event_params, params_array=params_array, compile_single_equation=compile_single_equation, verbose=bool(self.options.verbose > 0), ) for event_var in mdl.event_dict.keys(): event_idx: int | None = self.uid2idx_event_params.get(event_var.uid, None) if event_idx is None: pass else: mdl.event_dict[event_var] = self._event_parameters_eqs[event_idx] self._runtime_parameter_eqs0[event_idx] = self._event_parameters_eqs[event_idx] except Exception as e: block_name: str = _get_block_name(mdl) error_detail: str = f"{type(e).__name__}: {str(e)}" self.logger.add_warning( msg="EMT explicit initialization failed for unified device block.", device=block_name, value=error_detail, expected_value="Successful explicit initialization", device_class="EMT", device_property="init_explicit" ) if self.options.verbose > 0: self.logger.add_debug( f"[EMT][init_explicit] failed for unified device block '{block_name}'. Error: {error_detail}" ) print( f"EMT explicit initialization failed for unified device block {block_name}. " f"Error detail: {error_detail}" ) else: pass else: pass self._build_runtime_param_vectors() def _seed_bess_battery_init_from_converter(self) -> None: """ Reserved helper for future coordinated BESS battery/DC initialization. :return: None. """ return def _validate_dynamic_emt_interfaces(self) -> None: """ Validate that dynamic EMT ports are compatible with the static bus shells. The editor may expose a broader maximum EMT interface than the current network configuration. Before any symbolic rewiring or KCL assembly takes place, this validation checks that every exposed electrical reference can be reconciled with the corresponding bus domain and EMT bus shell. :raises EmtInterfaceValidationError: If any device exposes incompatible AC, neutral, or DC EMT references. :return: None. """ errors: List[str] = list() branch: Any injection: Any for branch in self.grid.get_branches_iter(add_vsc=True, add_hvdc=True, add_switch=True): errors.extend(self._validate_branch_emt_interface(branch)) for injection in self.grid.get_injection_devices_iter(): errors.extend(self._validate_injection_emt_interface(injection)) if len(errors) > 0: error_msg = "EMT interface validation failed:\n" + "\n".join(errors) raise EmtInterfaceValidationError(error_msg) else: pass def _validate_injection_emt_interface(self, injection: Any) -> List[str]: """ Validate the external EMT interface exposed by one injection device. :param injection: Injection device to validate. :return: List of validation errors for this device. """ present_refs: Set[VarPowerFlowReferenceType] = self._get_present_external_refs(injection.emt_model) errors: List[str] = list() errors.extend(self._validate_injection_bus_contract( device=injection, bus=injection.bus, side="injection", present_refs=present_refs, )) return errors def _validate_injection_bus_contract(self, device: Any, bus: Any, side: str, present_refs: Set[VarPowerFlowReferenceType]) -> List[str]: """ Validate one injection-style EMT interface against one bus shell. :param device: Device exposing the interface. :param bus: Connected bus. :param side: User-facing side label. :param present_refs: Non-null external references exposed by the device. :return: List of validation errors. """ errors: List[str] = list() ac_voltage_refs: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.v_C, ]) ac_current_to_voltage: Dict[VarPowerFlowReferenceType, VarPowerFlowReferenceType] = dict({ VarPowerFlowReferenceType.i_N: VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.i_A: VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.i_B: VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.i_C: VarPowerFlowReferenceType.v_C, }) supported_bus_voltage_refs: Set[VarPowerFlowReferenceType] = self._get_bus_voltage_refs(bus) voltage_ref: VarPowerFlowReferenceType current_ref: VarPowerFlowReferenceType required_voltage_ref: VarPowerFlowReferenceType if bus.is_dc: for voltage_ref in ac_voltage_refs: if voltage_ref in present_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=voltage_ref, detail="AC phase reference exposed on a DC bus.", )) else: pass for current_ref in ac_current_to_voltage.keys(): if current_ref in present_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=current_ref, detail="AC current reference exposed on a DC bus.", )) else: pass if VarPowerFlowReferenceType.Vdc in present_refs and VarPowerFlowReferenceType.Vdc not in supported_bus_voltage_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=VarPowerFlowReferenceType.Vdc, detail="DC voltage reference exposed but the bus EMT shell has no Vdc variable.", )) else: pass if VarPowerFlowReferenceType.Idc in present_refs: if VarPowerFlowReferenceType.Vdc not in present_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=VarPowerFlowReferenceType.Idc, detail="DC current reference exposed without a compatible Vdc input reference.", )) else: pass if VarPowerFlowReferenceType.Vdc not in supported_bus_voltage_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=VarPowerFlowReferenceType.Idc, detail="DC current reference exposed but the bus EMT shell has no Vdc variable.", )) else: pass else: pass else: if VarPowerFlowReferenceType.Vdc in present_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=VarPowerFlowReferenceType.Vdc, detail="DC voltage reference exposed on an AC bus.", )) else: pass if VarPowerFlowReferenceType.Idc in present_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=VarPowerFlowReferenceType.Idc, detail="DC current reference exposed on an AC bus.", )) else: pass for voltage_ref in ac_voltage_refs: if voltage_ref in present_refs and voltage_ref not in supported_bus_voltage_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=voltage_ref, detail="Voltage reference exposed but the bus EMT shell does not support it.", )) else: pass for current_ref, required_voltage_ref in ac_current_to_voltage.items(): if current_ref in present_refs: if required_voltage_ref not in present_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=current_ref, detail=( f"Current reference exposed without a compatible " f"{required_voltage_ref.value} input reference." ), )) else: pass if required_voltage_ref not in supported_bus_voltage_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=current_ref, detail="Current reference exposed but the bus EMT shell does not support the corresponding voltage phase.", )) else: pass else: pass return errors def _validate_branch_emt_interface(self, branch: Any) -> List[str]: """ Validate the external EMT interface exposed by one branch device. :param branch: Branch device to validate. :return: List of validation errors for this branch. """ present_refs: Set[VarPowerFlowReferenceType] = self._get_present_external_refs(branch.emt_model) errors: List[str] = list() errors.extend(self._validate_branch_side_emt_interface( device=branch, bus=branch.bus_from, side="from", present_refs=present_refs, voltage_map=dict({ VarPowerFlowReferenceType.vf_N: VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.vf_A: VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.vf_B: VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.vf_C: VarPowerFlowReferenceType.v_C, VarPowerFlowReferenceType.Vf_dc: VarPowerFlowReferenceType.Vdc, }), current_to_voltage_map=dict({ VarPowerFlowReferenceType.if_N: VarPowerFlowReferenceType.vf_N, VarPowerFlowReferenceType.if_A: VarPowerFlowReferenceType.vf_A, VarPowerFlowReferenceType.if_B: VarPowerFlowReferenceType.vf_B, VarPowerFlowReferenceType.if_C: VarPowerFlowReferenceType.vf_C, VarPowerFlowReferenceType.If_dc: VarPowerFlowReferenceType.Vf_dc, }), )) errors.extend(self._validate_branch_side_emt_interface( device=branch, bus=branch.bus_to, side="to", present_refs=present_refs, voltage_map=dict({ VarPowerFlowReferenceType.vt_N: VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.vt_A: VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.vt_B: VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.vt_C: VarPowerFlowReferenceType.v_C, VarPowerFlowReferenceType.Vt_dc: VarPowerFlowReferenceType.Vdc, }), current_to_voltage_map=dict({ VarPowerFlowReferenceType.it_N: VarPowerFlowReferenceType.vt_N, VarPowerFlowReferenceType.it_A: VarPowerFlowReferenceType.vt_A, VarPowerFlowReferenceType.it_B: VarPowerFlowReferenceType.vt_B, VarPowerFlowReferenceType.it_C: VarPowerFlowReferenceType.vt_C, VarPowerFlowReferenceType.It_dc: VarPowerFlowReferenceType.Vt_dc, }), )) errors.extend(self._validate_shared_branch_emt_interface(branch, present_refs)) errors.extend(self._validate_branch_dc_mapping_collisions(branch)) return errors def _validate_branch_side_emt_interface(self, device: Any, bus: Any, side: str, present_refs: Set[VarPowerFlowReferenceType], voltage_map: Dict[VarPowerFlowReferenceType, VarPowerFlowReferenceType], current_to_voltage_map: Dict[VarPowerFlowReferenceType, VarPowerFlowReferenceType]) -> List[str]: """ Validate one side-specific branch EMT contract against one terminal bus. :param device: Branch device exposing the interface. :param bus: Terminal bus to validate against. :param side: ``from`` or ``to``. :param present_refs: Non-null external references exposed by the branch. :param voltage_map: Mapping from branch voltage refs to bus voltage refs. :param current_to_voltage_map: Mapping from branch current refs to branch voltage refs. :return: List of validation errors. """ errors: List[str] = list() supported_bus_voltage_refs: Set[VarPowerFlowReferenceType] = self._get_bus_voltage_refs(bus) branch_voltage_ref: VarPowerFlowReferenceType bus_voltage_ref: VarPowerFlowReferenceType current_ref: VarPowerFlowReferenceType required_branch_voltage_ref: VarPowerFlowReferenceType required_bus_voltage_ref: VarPowerFlowReferenceType for branch_voltage_ref, bus_voltage_ref in voltage_map.items(): if branch_voltage_ref in present_refs: if bus.is_dc and bus_voltage_ref != VarPowerFlowReferenceType.Vdc: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=branch_voltage_ref, detail="AC branch voltage reference exposed on a DC bus.", )) elif (not bus.is_dc) and bus_voltage_ref == VarPowerFlowReferenceType.Vdc: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=branch_voltage_ref, detail="DC branch voltage reference exposed on an AC bus.", )) elif bus_voltage_ref not in supported_bus_voltage_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=branch_voltage_ref, detail="Voltage reference exposed but the terminal bus EMT shell does not support it.", )) else: pass else: pass for current_ref, required_branch_voltage_ref in current_to_voltage_map.items(): if current_ref in present_refs: required_bus_voltage_ref = voltage_map[required_branch_voltage_ref] if required_branch_voltage_ref not in present_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=current_ref, detail=( f"Current reference exposed without a compatible " f"{required_branch_voltage_ref.value} input reference." ), )) else: pass if bus.is_dc and required_bus_voltage_ref != VarPowerFlowReferenceType.Vdc: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=current_ref, detail="AC branch current reference exposed on a DC bus.", )) elif (not bus.is_dc) and required_bus_voltage_ref == VarPowerFlowReferenceType.Vdc: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=current_ref, detail="DC branch current reference exposed on an AC bus.", )) elif required_bus_voltage_ref not in supported_bus_voltage_refs: errors.append(self._build_emt_interface_error( device=device, bus=bus, side=side, reference=current_ref, detail="Current reference exposed but the terminal bus EMT shell does not support the corresponding voltage phase.", )) else: pass else: pass return errors def _validate_shared_branch_emt_interface(self, branch: Any, present_refs: Set[VarPowerFlowReferenceType]) -> List[str]: """ Validate shared EMT branch references used by VSC-style branch models. Some branch EMT templates use a single AC-side contract ``v_A/B/C`` and a single DC-side contract ``Vdc``/``Idc`` instead of side-specific branch terminal references. This validator allows that contract only when the network has exactly one compatible side for it. Two-sided DC branches are handled separately by ``_validate_branch_dc_mapping_collisions`` because legacy DC branch templates may expose shared ``Vdc`` / ``Idc`` only as aliases of their explicit terminal variables. :param branch: Branch device to validate. :param present_refs: Non-null external references exposed by the branch. :return: List of validation errors. """ errors: List[str] = list() shared_ac_refs: Set[VarPowerFlowReferenceType] = { VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.v_C, VarPowerFlowReferenceType.i_N, VarPowerFlowReferenceType.i_A, VarPowerFlowReferenceType.i_B, VarPowerFlowReferenceType.i_C, } shared_dc_refs: Set[VarPowerFlowReferenceType] = { VarPowerFlowReferenceType.Vdc, VarPowerFlowReferenceType.Idc, } shared_ac_present: Set[VarPowerFlowReferenceType] = present_refs.intersection(shared_ac_refs) shared_dc_present: Set[VarPowerFlowReferenceType] = present_refs.intersection(shared_dc_refs) ac_side_count: int = self._count_branch_sides_by_domain(branch, is_dc=False) dc_side_count: int = self._count_branch_sides_by_domain(branch, is_dc=True) if len(shared_ac_present) > 0: if ac_side_count == 1: if branch.bus_from.is_dc: errors.extend(self._validate_injection_bus_contract( device=branch, bus=branch.bus_to, side="to", present_refs=shared_ac_present, )) else: errors.extend(self._validate_injection_bus_contract( device=branch, bus=branch.bus_from, side="from", present_refs=shared_ac_present, )) else: errors.extend(self._build_shared_branch_domain_errors( branch=branch, refs=shared_ac_present, expected_domain="AC", side_count=ac_side_count, )) else: pass if len(shared_dc_present) > 0: if dc_side_count == 1: if branch.bus_from.is_dc: errors.extend(self._validate_injection_bus_contract( device=branch, bus=branch.bus_from, side="from", present_refs=shared_dc_present, )) else: errors.extend(self._validate_injection_bus_contract( device=branch, bus=branch.bus_to, side="to", present_refs=shared_dc_present, )) elif dc_side_count == 2: # Two-sided DC branches may legally expose shared aliases when the # real contract is still the explicit Vf/Vt and If/It terminal set. pass else: errors.extend(self._build_shared_branch_domain_errors( branch=branch, refs=shared_dc_present, expected_domain="DC", side_count=dc_side_count, )) else: pass return errors def _is_valid_legacy_shared_dc_branch_alias(self, branch: Any, reference: VarPowerFlowReferenceType) -> bool: """ Return whether one shared DC branch reference is a valid legacy alias. Legacy two-sided DC branch templates may keep exposing ``Vdc`` or ``Idc`` for backward compatibility even though EMT assembly now uses the explicit terminal contract ``Vf_dc`` / ``Vt_dc`` and ``If_dc`` / ``It_dc``. The shared reference is accepted only when both DC sides are present and the shared mapping aliases one of the already explicit terminal variables. :param branch: Branch device to inspect. :param reference: Shared DC reference to validate. :return: ``True`` when the shared reference is a safe legacy alias. """ external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(branch.emt_model) shared_var: Var | None from_var: Var | None to_var: Var | None if external_mapping is None: return False else: pass if not (branch.bus_from.is_dc and branch.bus_to.is_dc): return False else: pass if reference == VarPowerFlowReferenceType.Vdc: shared_var = external_mapping.get(VarPowerFlowReferenceType.Vdc, None) from_var = external_mapping.get(VarPowerFlowReferenceType.Vf_dc, None) to_var = external_mapping.get(VarPowerFlowReferenceType.Vt_dc, None) elif reference == VarPowerFlowReferenceType.Idc: shared_var = external_mapping.get(VarPowerFlowReferenceType.Idc, None) from_var = external_mapping.get(VarPowerFlowReferenceType.If_dc, None) to_var = external_mapping.get(VarPowerFlowReferenceType.It_dc, None) else: return False if shared_var is None or from_var is None or to_var is None: return False else: pass if shared_var.uid == from_var.uid: return True else: pass if shared_var.uid == to_var.uid: return True else: return False def _build_shared_branch_domain_errors(self, branch: Any, refs: Set[VarPowerFlowReferenceType], expected_domain: str, side_count: int) -> List[str]: """ Build explicit errors for ambiguous shared branch-domain references. :param branch: Branch device exposing the references. :param refs: Shared references that could not be assigned safely. :param expected_domain: Domain required by the shared references. :param side_count: Number of compatible terminal buses found. :return: List of formatted error strings. """ errors: List[str] = list() reference: VarPowerFlowReferenceType for reference in refs: errors.append( f" - Device '{self._get_device_name(branch)}' ({self._get_device_type_label(branch)}) " f"branch ref '{reference.value}' is a shared {expected_domain} interface, but the branch has " f"{side_count} compatible {expected_domain} side(s). Use side-specific EMT references instead." ) return errors def _validate_branch_dc_mapping_collisions(self, branch: Any) -> List[str]: """ Validate that branch DC terminal mappings cannot collide across sides. :param branch: Branch device to validate. :return: List of validation errors. """ errors: List[str] = list() external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(branch.emt_model) if_dc_var: Var | None it_dc_var: Var | None v_f_dc_var: Var | None v_t_dc_var: Var | None shared_v_dc_var: Var | None shared_i_dc_var: Var | None if external_mapping is None: return errors else: pass if_dc_var = external_mapping.get(VarPowerFlowReferenceType.If_dc, None) it_dc_var = external_mapping.get(VarPowerFlowReferenceType.It_dc, None) v_f_dc_var = external_mapping.get(VarPowerFlowReferenceType.Vf_dc, None) v_t_dc_var = external_mapping.get(VarPowerFlowReferenceType.Vt_dc, None) shared_v_dc_var = external_mapping.get(VarPowerFlowReferenceType.Vdc, None) shared_i_dc_var = external_mapping.get(VarPowerFlowReferenceType.Idc, None) if if_dc_var is not None and it_dc_var is not None and if_dc_var.uid == it_dc_var.uid: errors.append( f" - Device '{self._get_device_name(branch)}' ({self._get_device_type_label(branch)}) " "uses the same variable for both If_dc and It_dc. Branch DC terminal currents must be side-specific." ) else: pass if v_f_dc_var is not None and v_t_dc_var is not None and v_f_dc_var.uid == v_t_dc_var.uid: errors.append( f" - Device '{self._get_device_name(branch)}' ({self._get_device_type_label(branch)}) " "uses the same variable for both Vf_dc and Vt_dc. Branch DC terminal voltages must be side-specific." ) else: pass if branch.bus_from.is_dc and branch.bus_to.is_dc: if shared_v_dc_var is not None: if self._is_valid_legacy_shared_dc_branch_alias(branch, VarPowerFlowReferenceType.Vdc): pass else: errors.append( f" - Device '{self._get_device_name(branch)}' ({self._get_device_type_label(branch)}) " "exposes shared Vdc on a two-sided DC branch without a full side-specific alias contract. " "Use Vf_dc and Vt_dc instead." ) else: pass if shared_i_dc_var is not None: if self._is_valid_legacy_shared_dc_branch_alias(branch, VarPowerFlowReferenceType.Idc): pass else: errors.append( f" - Device '{self._get_device_name(branch)}' ({self._get_device_type_label(branch)}) " "exposes shared Idc on a two-sided DC branch without a full side-specific alias contract. " "Use If_dc and It_dc instead." ) else: pass else: pass return errors def _count_branch_sides_by_domain(self, branch: Any, is_dc: bool) -> int: """ Count the number of branch terminal buses matching one electrical domain. :param branch: Branch device to inspect. :param is_dc: ``True`` for DC buses, ``False`` for AC buses. :return: Number of matching terminal buses. """ count: int = 0 if branch.bus_from.is_dc == is_dc: count += 1 else: pass if branch.bus_to.is_dc == is_dc: count += 1 else: pass return count def _get_present_external_refs(self, mdl: Block) -> Set[VarPowerFlowReferenceType]: """ Return the set of non-null electrical external references exposed by one model. :param mdl: Device block to inspect. :return: Set of exposed external reference keys. """ refs: Set[VarPowerFlowReferenceType] = set() external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) reference: Any mapped_var: Var | None if external_mapping is None: return refs else: pass for reference, mapped_var in external_mapping.items(): if isinstance(reference, VarPowerFlowReferenceType) and mapped_var is not None: refs.add(reference) else: pass return refs def _get_bus_voltage_refs(self, bus: Any) -> Set[VarPowerFlowReferenceType]: """ Return the non-null bus-shell voltage references supported by one bus. :param bus: Bus API object. :return: Supported bus voltage references. """ refs: Set[VarPowerFlowReferenceType] = set() reference: VarPowerFlowReferenceType candidate_refs: List[VarPowerFlowReferenceType] if bus.is_dc: candidate_refs = list([VarPowerFlowReferenceType.Vdc]) else: candidate_refs = list([ VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.v_C, ]) for reference in candidate_refs: if _get_external_mapping_var_if_present(bus.emt_model, reference) is not None: refs.add(reference) else: pass return refs def _build_emt_interface_error(self, device: Any, bus: Any, side: str, reference: VarPowerFlowReferenceType, detail: str) -> str: """ Build one formatted EMT interface-validation error message. :param device: Device that exposes the incompatible reference. :param bus: Connected bus. :param side: User-facing side label. :param reference: Incompatible external reference. :param detail: Specific mismatch explanation. :return: Formatted error string. """ category: str = self._get_emt_interface_ref_category(reference) return ( f" - Device '{self._get_device_name(device)}' ({self._get_device_type_label(device)}) " f"bus '{bus.name}' side '{side}' ref '{reference.value}' [{category}]: {detail}" ) def _get_emt_interface_ref_category(self, reference: VarPowerFlowReferenceType) -> str: """ Classify one EMT electrical reference for user-facing error messages. :param reference: Electrical interface reference. :return: Category label such as ``AC phase``, ``neutral``, or ``DC``. """ if reference in { VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.i_N, VarPowerFlowReferenceType.vf_N, VarPowerFlowReferenceType.if_N, VarPowerFlowReferenceType.vt_N, VarPowerFlowReferenceType.it_N, }: return "neutral" elif reference in { VarPowerFlowReferenceType.Vdc, VarPowerFlowReferenceType.Idc, VarPowerFlowReferenceType.Vf_dc, VarPowerFlowReferenceType.Vt_dc, VarPowerFlowReferenceType.If_dc, VarPowerFlowReferenceType.It_dc, }: return "DC" else: return "AC phase" def _get_device_name(self, device: Any) -> str: """ Return one stable user-facing device name for diagnostics. :param device: Device to describe. :return: Device name when available. """ try: return str(device.name) except Exception: return str(type(device).__name__) def _get_device_type_label(self, device: Any) -> str: """ Return one stable user-facing device-type label for diagnostics. :param device: Device to describe. :return: Device-type label. """ try: return str(device.device_type.value) except Exception: return str(type(device).__name__) def _validate_connections(self) -> None: """ Validate EMT topology connectivity. Checks that each phase exposed by a bus must be connected to at least 2 elements (branches or injections). A floating phase indicates an open circuit. The phase connection mapping is: - Branch in_vars with ref vf_N/A/B/C connect to bus_from's v_N/A/B/C - Branch in_vars with ref vt_N/A/B/C connect to bus_to's v_N/A/B/C - Injection device in_vars with ref v_N/A/B/C connect to their bus's v_N/A/B/C :raises EmtTopologyError: If topology inconsistencies are found. """ phase_ref_to_bus_phase = { VarPowerFlowReferenceType.vf_N: VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.vf_A: VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.vf_B: VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.vf_C: VarPowerFlowReferenceType.v_C, VarPowerFlowReferenceType.vt_N: VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.vt_A: VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.vt_B: VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.vt_C: VarPowerFlowReferenceType.v_C, } branch_phase_refs = { VarPowerFlowReferenceType.vf_N, VarPowerFlowReferenceType.vf_A, VarPowerFlowReferenceType.vf_B, VarPowerFlowReferenceType.vf_C, VarPowerFlowReferenceType.vt_N, VarPowerFlowReferenceType.vt_A, VarPowerFlowReferenceType.vt_B, VarPowerFlowReferenceType.vt_C, } ac_phases = [ VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.v_C, ] bus_phase_connections: Dict[Tuple[Any, VarPowerFlowReferenceType], Set[str]] = dict() for bus in self.grid.buses: for phase in ac_phases: bus_phase_connections[(bus, phase)] = set() vsc_ac_phase_refs = { VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.v_C, } for br in self.grid.get_branches_iter(add_vsc=True, add_hvdc=True, add_switch=True): br_emt_model = self._working_emt_models.get(str(br.idtag), br.emt_model) if br.device_type == DeviceType.LineDevice: validate_line_phase_layout(branch=br, mdl=br_emt_model, logger=self.logger) else: pass if br_emt_model is not None: br_bus_from = br.bus_from br_bus_to = br.bus_to is_bergeron, is_jmarti = _is_history_runtime_line_block_name(br_emt_model.name) if is_bergeron or is_jmarti: br_ys = br.ys if br_ys.phN: bus_phase_connections[(br_bus_from, VarPowerFlowReferenceType.v_N)].add(f"branch_from:{br.idtag}") bus_phase_connections[(br_bus_to, VarPowerFlowReferenceType.v_N)].add(f"branch_to:{br.idtag}") else: pass if br_ys.phA: bus_phase_connections[(br_bus_from, VarPowerFlowReferenceType.v_A)].add(f"branch_from:{br.idtag}") bus_phase_connections[(br_bus_to, VarPowerFlowReferenceType.v_A)].add(f"branch_to:{br.idtag}") else: pass if br_ys.phB: bus_phase_connections[(br_bus_from, VarPowerFlowReferenceType.v_B)].add(f"branch_from:{br.idtag}") bus_phase_connections[(br_bus_to, VarPowerFlowReferenceType.v_B)].add(f"branch_to:{br.idtag}") else: pass if br_ys.phC: bus_phase_connections[(br_bus_from, VarPowerFlowReferenceType.v_C)].add(f"branch_from:{br.idtag}") bus_phase_connections[(br_bus_to, VarPowerFlowReferenceType.v_C)].add(f"branch_to:{br.idtag}") else: pass else: # Deserialized EMT blocks may preserve branch terminal # references in nested external mappings even when the root # ``in_vars`` layout differs from a freshly rebuilt # template. Build the connectivity view from the recovered # hierarchical external mapping first, because those keys # are the canonical semantic contract used across the EMT # builder. branch_external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(br_emt_model) present_branch_refs: Set[VarPowerFlowReferenceType] = set() if branch_external_mapping is not None: branch_ref_key: Any branch_ref_var: Var | None for branch_ref_key, branch_ref_var in branch_external_mapping.items(): if isinstance(branch_ref_key, VarPowerFlowReferenceType) and branch_ref_var is not None: present_branch_refs.add(branch_ref_key) else: pass else: pass # Freshly rebuilt models expose the same contract directly # on ``in_vars``. Keep that path as a fallback so both live # and deserialized models are handled uniformly. if len(present_branch_refs) == 0: in_var: Var for in_var in br_emt_model.in_vars: if in_var.ref is not None: present_branch_refs.add(in_var.ref) else: pass else: pass branch_ref: VarPowerFlowReferenceType for branch_ref in present_branch_refs: if branch_ref in branch_phase_refs: bus_phase = phase_ref_to_bus_phase[branch_ref] if branch_ref in { VarPowerFlowReferenceType.vf_N, VarPowerFlowReferenceType.vf_A, VarPowerFlowReferenceType.vf_B, VarPowerFlowReferenceType.vf_C, }: bus_phase_connections[(br_bus_from, bus_phase)].add(f"branch_from:{br.idtag}") elif branch_ref in { VarPowerFlowReferenceType.vt_N, VarPowerFlowReferenceType.vt_A, VarPowerFlowReferenceType.vt_B, VarPowerFlowReferenceType.vt_C, }: bus_phase_connections[(br_bus_to, bus_phase)].add(f"branch_to:{br.idtag}") else: pass elif branch_ref in vsc_ac_phase_refs: if br_bus_to.is_dc: bus_phase_connections[(br_bus_from, branch_ref)].add(f"branch_from:{br.idtag}") else: bus_phase_connections[(br_bus_to, branch_ref)].add(f"branch_to:{br.idtag}") else: pass else: pass for inj in self.grid.get_injection_devices_iter(): inj_emt_model = self._working_emt_models.get(str(inj.idtag), inj.emt_model) if inj_emt_model is not None: inj_bus = inj.bus for in_var in inj_emt_model.in_vars: if in_var.ref in ac_phases: bus_phase_connections[(inj_bus, in_var.ref)].add(f"injection:{inj.idtag}") else: pass if _has_internal_grounding_link(inj_emt_model): bus_phase_connections[(inj_bus, VarPowerFlowReferenceType.v_N)].add( f"internal_ground:{inj.idtag}" ) else: pass else: pass floating_phases: List[str] = list() for bus in self.grid.buses: if not bus.is_dc: bus_mdl = self._working_emt_models.get(str(bus.idtag), bus.emt_model) bus_out_refs = {v.ref for v in bus_mdl.out_vars} for phase in ac_phases: connections = bus_phase_connections.get((bus, phase), set()) if phase in bus_out_refs and len(connections) < 2: floating_phases.append( f" - Bus '{bus.name}' phase '{phase.value}' is floating " f"(connected to {len(connections)} element(s): {connections})" ) else: pass else: pass if floating_phases: error_msg = ( "TopologyError: Floating bus phases detected " "(each phase must be connected to at least 2 elements):\n" ) error_msg += "\n".join(floating_phases) raise EmtTopologyError(error_msg) else: pass def _process_device_model(self, dev: Any, mdl:Block, sys_block: Block) -> Block: """ Register a device EMT model into the global system block. The device model is flattened before being attached to the system block. Some wrapper-style models store PF-initialization metadata in nested child blocks. For that reason, the relevant mappings are rebuilt first and then restored on the device root before the block is added to the global system. :param dev: Device object from the grid. :param mdl: Device block :param sys_block: Main DAE system block. :return: Unified device block. """ resolved_external_mapping: Dict[Any, Var] # Wrapper roots may not own the PF-parameter mapping directly, so the # hierarchy is flattened into one root-visible mapping before build use. resolved_api_mapping: Dict[ParamPowerFlowReferenceType, Any] = _collect_api_obj_mapping_from_block(mdl) if len(resolved_api_mapping) > 0: mdl.api_obj_mapping = resolved_api_mapping else: pass # Wrapper roots may also hide PF variable references in leaf blocks. # Re-attaching the merged mapping on the root keeps all later lookups # consistent with the already recovered API-object mapping. resolved_external_mapping = _collect_external_mapping_from_block(mdl) if len(resolved_external_mapping) > 0: mdl.external_mapping = resolved_external_mapping else: pass # Cache working EMT models by the stable device idtag instead of the # Python object identity so all lookups follow the grid semantic # identifier used by the rest of the EMT event machinery. dev_key: str = str(dev.idtag) if dev_key not in self._registered_working_emt_models: parameter_owner_blocks: List[Block] = _collect_block_hierarchy(mdl) parameter_owner_block: Block # Register runtime-updatable EMT parameters declared by the device block. # EMT runtime events are no longer baked into the symbolic model during the # parsing phase. Instead, the problem stores the event-capable parameters and # later activates the selected EMT event group through ``set_events_group()``. if not mdl.event_dict and not mdl.mode_dict: # Wrapper roots may keep all runtime parameters inside nested child # blocks, so the registration pass must still inspect the full # hierarchy before deciding that nothing needs to be registered. for parameter_owner_block in parameter_owner_blocks: if parameter_owner_block is mdl: pass else: for parameter in parameter_owner_block.event_dict.keys(): self._event_parameter_device_idtags[parameter.uid] = dev.idtag self._event_parameter_name_lookup[(str(dev.idtag), str(parameter.name))] = parameter.uid self._continuous_event_parameter_uids.add(parameter.uid) for parameter in parameter_owner_block.mode_dict.keys(): self._event_parameter_device_idtags[parameter.uid] = dev.idtag self._event_parameter_name_lookup[(str(dev.idtag), str(parameter.name))] = parameter.uid self._discrete_event_parameter_uids.add(parameter.uid) else: for parameter in mdl.event_dict.keys(): self._event_parameter_device_idtags[parameter.uid] = dev.idtag self._event_parameter_name_lookup[(str(dev.idtag), str(parameter.name))] = parameter.uid self._continuous_event_parameter_uids.add(parameter.uid) for parameter in mdl.mode_dict.keys(): self._event_parameter_device_idtags[parameter.uid] = dev.idtag self._event_parameter_name_lookup[(str(dev.idtag), str(parameter.name))] = parameter.uid self._discrete_event_parameter_uids.add(parameter.uid) # Child blocks may define additional runtime parameters that are # not re-exported on the wrapper root. Those nested parameters # must also be registered so later EMT event activation and # runtime seeding can find them by UID and by device/name. for parameter_owner_block in parameter_owner_blocks: if parameter_owner_block is mdl: pass else: for parameter in parameter_owner_block.event_dict.keys(): self._event_parameter_device_idtags[parameter.uid] = dev.idtag self._event_parameter_name_lookup[(str(dev.idtag), str(parameter.name))] = parameter.uid self._continuous_event_parameter_uids.add(parameter.uid) for parameter in parameter_owner_block.mode_dict.keys(): self._event_parameter_device_idtags[parameter.uid] = dev.idtag self._event_parameter_name_lookup[(str(dev.idtag), str(parameter.name))] = parameter.uid self._discrete_event_parameter_uids.add(parameter.uid) # Add model to system mappings # Populates tracking dictionaries to maintain the relationship between # electrical devices and their corresponding symbolic variables. block_item: Block for block_item in mdl.get_all_blocks(): v: Var for v in block_item.state_vars: self.add_device_var(dev=dev, var=v) self._vars_glob_name2uid[v.name + dev.name] = v.uid self._vars2device[v.uid] = dev for v in block_item.algebraic_vars: self.add_device_var(dev=dev, var=v) self._vars_glob_name2uid[v.name + dev.name] = v.uid self._vars2device[v.uid] = dev dv: Var for dv in block_item.diff_vars: self.add_device_var(dev=dev, var=dv) self._vars_glob_name2uid[dv.name + dev.name] = dv.uid self._vars2device[dv.uid] = dev queue_emt_fmu_cs_device(self, dev, mdl) queue_emt_fmu_me_device(self, dev, mdl) sys_block.add(mdl) self._register_init_model(mdl) # self._register_diff_init_model(mdl) self._registered_working_emt_models.add(dev_key) return mdl @staticmethod def _assign_api_mapping_value_if_present(mdl: Block, key: ParamPowerFlowReferenceType, value: Any, static_parameter_values_mapping: Dict[Var, Const], ) -> None: """ Assign one static API-object parameter into ``mdl.parameters``. This function intentionally never writes into ``mdl.event_dict``. The ``api_obj_mapping`` contract is static-device to constant EMT parameter mapping only. Runtime/event parameters belong to the explicit initialization/runtime event path. :param mdl: EMT model block. :param key: Static parameter reference key. :param value: Static value to assign. :param static_parameter_values_mapping: Dictionary with static parameters and their values. :return: Assignment status. """ api_obj_mapping: Any = mdl.api_obj_mapping event_owner: Optional[Block] if isinstance(api_obj_mapping, dict): target: Any | None = api_obj_mapping.get(key, None) if target is not None: if isinstance(target, Var): # Wrapper roots may expose API mappings for parameters whose # actual runtime storage belongs to a nested child block. # Static assignment must therefore skip every target that is # owned by any child ``event_dict`` in the hierarchy. event_owner = _find_event_var_owner(mdl, target) if event_owner is not None: pass else: static_parameter_values_mapping[target] = value if isinstance(value, Const) else Const(float(value)) else: pass else: pass else: pass @staticmethod def _assign_event_mapping_value_if_present(mdl: Block, key: ParamPowerFlowReferenceType, value: float, ) -> None: """ Assign one runtime/event parameter exposed through ``api_obj_mapping``. Some higher-level EMT wrappers expose PF-derived sharing targets through ``api_obj_mapping`` even though the actual parameter lives in ``event_dict``. Those values must be seeded on the runtime side before explicit initialization, but they must not pass through the static API assignment helper. :param mdl: EMT model block. :param key: API mapping reference key. :param value: Runtime value to assign. :return: None. """ api_obj_mapping: Any = mdl.api_obj_mapping owner_block: Optional[Block] target: Any | None if isinstance(api_obj_mapping, dict): target = api_obj_mapping.get(key, None) if target is None: pass else: if isinstance(target, Var): # The mapped runtime parameter may live in a nested child # block, so the assignment must target the owner block that # actually stores the mutable ``event_dict`` entry. owner_block = _find_event_var_owner(mdl, target) if owner_block is None: pass else: owner_block.event_dict[target] = Const(float(value)) else: pass else: pass # --------------------------------------------------------------------- # BUILD STRUCTURE # --------------------------------------------------------------------- def _build_structure_and_collect( self, sys_block: Block, grid: MultiCircuit, static_parameter_values_mapping: Dict[Var, Const], ) -> None: """ Build the EMT system structure and collect all algebraic KCL equations. This routine assembles the EMT nodal equations in abc coordinates. The unknowns are the bus phase voltages and the algebraic equations are the KCL balances at each bus-phase. The function preserves the original sign conventions: - Branch currents are defined as leaving the bus, therefore they are subtracted from the KCL accumulator. - Injection currents are defined as entering the bus, therefore they are added to the KCL accumulator. :param sys_block: Block containing the full system DAE. :param grid: Network model to assemble. :param static_parameter_values_mapping: Dictionary with static parameters and their values. :return: None. """ bus_working_models: Dict[Any, Block] = dict() for bus in grid.buses: bus_working_models[bus] = self._get_or_create_working_emt_model(bus) bus_dict: Dict[Any, int] = dict() ph_v_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.v_N, VarPowerFlowReferenceType.v_A, VarPowerFlowReferenceType.v_B, VarPowerFlowReferenceType.v_C, ]) inj_i_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.i_N, VarPowerFlowReferenceType.i_A, VarPowerFlowReferenceType.i_B, VarPowerFlowReferenceType.i_C, ]) br_if_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.if_N, VarPowerFlowReferenceType.if_A, VarPowerFlowReferenceType.if_B, VarPowerFlowReferenceType.if_C, ]) br_it_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.it_N, VarPowerFlowReferenceType.it_A, VarPowerFlowReferenceType.it_B, VarPowerFlowReferenceType.it_C, ]) br_vf_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.vf_N, VarPowerFlowReferenceType.vf_A, VarPowerFlowReferenceType.vf_B, VarPowerFlowReferenceType.vf_C, ]) br_vt_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.vt_N, VarPowerFlowReferenceType.vt_A, VarPowerFlowReferenceType.vt_B, VarPowerFlowReferenceType.vt_C, ]) c0: Any = Const(0.0) # bus loop to create the I_kcl equations I_kcl: Dict[int, List[Any]] = dict() for bus_idx, bus in enumerate(grid.buses): if bus.is_dc: I_kcl[bus_idx] = list([c0]) else: I_kcl[bus_idx] = list([c0, c0, c0, c0]) bus_dict[bus] = int(bus_idx) # Collect injections only after the bus dictionary has been built. # The generator-sharing bookkeeping needs the bus indices, so it must run # after bus_dict is available. injection_devices: List[Any] = list(grid.get_injection_devices_iter()) generator_count_same_bus: Dict[int, int] = dict() for bus_idx, _ in enumerate(grid.buses): generator_count_same_bus[bus_idx] = 0 for inj in injection_devices: bus_idx_local: int = bus_dict[inj.bus] if inj.device_type == DeviceType.GeneratorDevice: generator_count_same_bus[bus_idx_local] += 1 else: pass h: float = float(self.options.time_step) sbase: float = float(grid.Sbase) fbase: float = float(grid.fBase) vsc_idx_dict: Dict[str, int] = dict() for vsc_idx, vsc in enumerate(grid.vsc_devices): vsc_idx_dict[vsc.idtag] = vsc_idx pf_branch_index: int = 0 for branch_idx, br in enumerate(grid.get_branches_iter(add_vsc=True, add_hvdc=True, add_switch=True)): f: int = bus_dict[br.bus_from] t: int = bus_dict[br.bus_to] br_working_mdl: Block = self._get_or_create_working_emt_model(br) # self._working_emt_models[str(br.idtag)] = br_working_mdl mdl: Block = self._process_device_model(dev=br, mdl=br_working_mdl, sys_block=sys_block) is_bergeron, is_jmarti = _is_history_runtime_line_block_name(mdl.name) is_vsc: bool = br.device_type == DeviceType.VscDevice uses_branch_pf_slot: bool = br.device_type not in {DeviceType.VscDevice, DeviceType.HVDCLineDevice} current_pf_branch_index: int = pf_branch_index vsc_index: int = vsc_idx_dict.get(br.idtag, -1) if uses_branch_pf_slot: pf_branch_index += 1 else: pass assign_static_api_object_mapping_for_device( grid=grid, device=br, mdl=mdl, problem_mapping=static_parameter_values_mapping, logger=self.logger, ) if is_bergeron or is_jmarti: v_f_vars: List[Any] = _get_bus_v_list( bus_block=bus_working_models[br.bus_from], ph_v_keys=ph_v_keys ) v_t_vars: List[Any] = _get_bus_v_list( bus_block=bus_working_models[br.bus_to], ph_v_keys=ph_v_keys ) if is_bergeron: rt: BergeronHistoryRuntime | JMartiHistoryRuntime = BergeronHistoryRuntime( line=br, line_block=mdl, h=h, sbase=sbase, fbase=fbase ) else: rt = JMartiHistoryRuntime( line=br, line_block=mdl, h=h, ) rt.bind_terminals(v_f_vars, v_t_vars) if self.power_flow_results_3ph is not None: if is_bergeron: self._try_set_bergeron_pf_init( mdl=mdl, rt=rt, branch_index=current_pf_branch_index, f_bus_idx=f, t_bus_idx=t, sbase=grid.Sbase, ) else: self._try_set_jmarti_pf_init( mdl=mdl, rt=rt, branch_index=current_pf_branch_index, f_bus_idx=f, t_bus_idx=t, sbase=grid.Sbase, ) else: if self.power_flow_results is not None: if self.options.verbose: if is_bergeron: print("Initializing bergeron line variables assuming a balanced power flow") else: print("Initializing JMARTI line variables assuming a balanced power flow") else: pass if is_bergeron: self._try_set_bergeron_pf_init_balanced( mdl=mdl, rt=rt, branch_index=current_pf_branch_index, f_bus_idx=f, t_bus_idx=t, sbase=grid.Sbase, ) else: self._try_set_jmarti_pf_init_balanced( mdl=mdl, rt=rt, branch_index=current_pf_branch_index, f_bus_idx=f, t_bus_idx=t, sbase=grid.Sbase, ) else: if self.options.verbose: print("No Power Flow results given.") else: pass i_f_exprs: List[Any] i_t_exprs: List[Any] i_f_exprs, i_t_exprs = rt.get_nodal_injections() for ph in range(4): if i_f_exprs[ph] is not None: I_kcl[f][ph] = I_kcl[f][ph] - i_f_exprs[ph] else: pass if i_t_exprs[ph] is not None: I_kcl[t][ph] = I_kcl[t][ph] - i_t_exprs[ph] else: pass self.history_models.append(rt) else: side_bus_indices: List[int] = list([f, t]) side_buses: List[Any] = list([br.bus_from, br.bus_to]) side_current_keys: List[List[VarPowerFlowReferenceType]] = list([br_if_keys, br_it_keys]) for side_idx in range(2): side_bus_idx: int = side_bus_indices[side_idx] side_bus: Any = side_buses[side_idx] current_keys: List[VarPowerFlowReferenceType] = side_current_keys[side_idx] if side_bus.is_dc: external_mapping_dc: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) dc_current_key: VarPowerFlowReferenceType = ( VarPowerFlowReferenceType.If_dc if side_idx == 0 else VarPowerFlowReferenceType.It_dc ) side_dc_expr: Any | None = None if external_mapping_dc is not None: side_dc_expr = external_mapping_dc.get(dc_current_key, None) if side_dc_expr is None: side_dc_expr = external_mapping_dc.get(VarPowerFlowReferenceType.Idc, None) else: pass if side_dc_expr is not None: I_kcl[side_bus_idx][0] = I_kcl[side_bus_idx][0] - side_dc_expr else: pass else: for ph in range(4): if is_vsc: side_expr: Any | None = _get_external_mapping_var_if_present(mdl=mdl, key=inj_i_keys[ph]) else: side_expr = _get_external_mapping_var_if_present(mdl=mdl, key=current_keys[ph]) if side_expr is not None: I_kcl[side_bus_idx][ph] = I_kcl[side_bus_idx][ph] - side_expr else: pass terminal_buses: List[Any] = list([br.bus_from, br.bus_to]) terminal_voltage_keys: List[List[VarPowerFlowReferenceType]] = list([br_vf_keys, br_vt_keys]) for term_idx in range(2): terminal_bus: Any = terminal_buses[term_idx] terminal_keys: List[VarPowerFlowReferenceType] = terminal_voltage_keys[term_idx] external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) if terminal_bus.is_dc: v_dc, _, _, _ = get_bus_emt_algebraic_vars(bus_working_models[terminal_bus]) if external_mapping is not None: dc_voltage_key: VarPowerFlowReferenceType = ( VarPowerFlowReferenceType.Vf_dc if term_idx == 0 else VarPowerFlowReferenceType.Vt_dc ) mapped_var: Optional[Var] = external_mapping.get(dc_voltage_key, None) if mapped_var is None: mapped_var = external_mapping.get(VarPowerFlowReferenceType.Vdc, None) if mapped_var is not None: mdl.update_model(mapped_var, v_dc) else: pass else: pass else: v_n, v_a, v_b, v_c = get_bus_emt_algebraic_vars(terminal_bus.emt_model) terminal_voltage_vars: List[Any] = list([v_n, v_a, v_b, v_c]) if external_mapping is not None: for ph in range(4): mapped_var = external_mapping.get(terminal_keys[ph], None) terminal_voltage_var = terminal_voltage_vars[ph] if mapped_var is not None and terminal_voltage_var is not None: mdl.update_model(mapped_var, terminal_voltage_var) else: pass else: pass if self.power_flow_results_3ph is not None: if is_vsc and vsc_index >= 0: self._try_set_vsc_branch_pf_init( mdl=mdl, f_bus_idx=f, t_bus_idx=t, sbase=sbase, vsc_index=vsc_index, ) else: self._try_set_branch_pf_init( mdl=mdl, branch_index=current_pf_branch_index, f_bus_idx=f, t_bus_idx=t, sbase=grid.Sbase, is_vsc=is_vsc, vsc_index=vsc_index ) else: if self.power_flow_results is not None: if self.options.verbose: print("Initializing branch variables assuming a balanced power flow") else: pass self._try_set_branch_pf_init_balanced( mdl=mdl, branch_index=current_pf_branch_index, f_bus_idx=f, t_bus_idx=t, sbase=grid.Sbase, is_vsc=is_vsc, vsc_index=vsc_index ) else: if self.options.verbose: print("No Power Flow results given.") else: pass # seed store to save the different injections in a bus seed_store: Optional[EmtInjectionSeedStore] if self.power_flow_results_3ph is not None: seed_store = self._build_injection_seed_store_3ph( grid=grid, bus_dict=bus_dict, injection_devices=injection_devices, sbase=grid.Sbase, ) else: if self.power_flow_results is not None: seed_store = self._build_injection_seed_store_balanced( grid=grid, bus_dict=bus_dict, injection_devices=injection_devices, sbase=grid.Sbase, ) else: seed_store = None for injection_index, inj in enumerate(injection_devices): inj_working_mdl: Block = self._get_or_create_working_emt_model(inj) mdl: Block = self._process_device_model(dev=inj, mdl=inj_working_mdl, sys_block= sys_block) b: int = bus_dict[inj.bus] assign_static_api_object_mapping_for_device( grid=grid, device=inj, mdl=mdl, problem_mapping=static_parameter_values_mapping, logger=self.logger, ) if inj.bus.is_dc: i_dc_expr: Any | None = _get_external_mapping_var_if_present( mdl=mdl, key=VarPowerFlowReferenceType.Idc, ) _accumulate_kcl_current( i_kcl=I_kcl, bus_index=b, phase_index=0, current_expression=i_dc_expr, sign=1.0, ) else: ph: int = 0 while ph < 4: i_expr: Any | None = _get_external_mapping_var_if_present(mdl=mdl, key=inj_i_keys[ph]) _accumulate_kcl_current( i_kcl=I_kcl, bus_index=b, phase_index=ph, current_expression=i_expr, sign=1.0, ) ph += 1 inj_external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) inj_bus_mdl: Block = bus_working_models[inj.bus] if inj.bus.is_dc: v_dc, _, _, _ = get_bus_emt_algebraic_vars(inj_bus_mdl) if inj_external_mapping is not None: mapped_var = inj_external_mapping.get(VarPowerFlowReferenceType.Vdc, None) if mapped_var is not None: mdl.update_model(mapped_var, v_dc) else: pass else: pass else: v_n, v_a, v_b, v_c = get_bus_emt_algebraic_vars(inj_bus_mdl) inj_bus_voltage_vars: List[Any] = list([v_n, v_a, v_b, v_c]) if inj_external_mapping is not None: for ph in range(4): mapped_var = inj_external_mapping.get(ph_v_keys[ph], None) inj_bus_voltage_var = inj_bus_voltage_vars[ph] if mapped_var is not None and inj_bus_voltage_var is not None: mdl.update_model(mapped_var, inj_bus_voltage_var) else: pass else: pass if seed_store is None: if self.options.verbose: print("No Power Flow results given.") else: pass else: self._apply_injection_seed( injection=inj, mdl=mdl, bus_index=b, seed_store=seed_store, injection_index=injection_index, generator_count_same_bus=int(generator_count_same_bus[b]), ) if inj.device_type == DeviceType.GeneratorDevice and not inj.bus.is_dc: phase_power_seed: np.ndarray = seed_store.get_ac_seed(injection_index) generator_count_local: int = int(generator_count_same_bus[b]) self._assign_generator_sharing_targets_from_seed( inj=inj, mdl=mdl, phase_power=phase_power_seed, generator_count_same_bus=generator_count_local, static_parameter_values_mapping= static_parameter_values_mapping, ) else: pass for bus_idx, bus in enumerate(grid.buses): bus_working_mdl: Block = self._get_or_create_working_emt_model(bus) mdl: Block = self._process_device_model(dev=bus, mdl=bus_working_mdl, sys_block= sys_block) if self.power_flow_results_3ph is not None: self._try_set_bus_pf_init( bus=bus, mdl=mdl, bus_index=bus_idx ) else: if self.power_flow_results is not None: if self.options.verbose: print("Initializing bus variables assuming a balanced power flow") else: pass self._try_set_bus_pf_init_balanced( bus=bus, mdl=mdl, bus_index=bus_idx ) else: if self.options.verbose: print("No Power Flow results given.") else: pass for bus_idx, bus in enumerate(grid.buses): mdl: Block = bus_working_models[bus] mdl.algebraic_eqs = list() if bus.is_dc: v_dc_expr: Any | None = _get_external_mapping_var_if_present( mdl=mdl, key=VarPowerFlowReferenceType.Vdc, ) if v_dc_expr is not None: mdl.algebraic_eqs.append(I_kcl[bus_idx][0]) else: pass else: for ph_idx, v_key in enumerate(ph_v_keys): v_expr: Any | None = _get_external_mapping_var_if_present(mdl=mdl, key=v_key) if v_expr is not None: mdl.algebraic_eqs.append(I_kcl[bus_idx][ph_idx]) else: pass if len(mdl.algebraic_eqs) == 0: self.logger.add_error( "Bus has no phases (no voltage vars)", value=str(bus_idx) ) else: pass def _get_emt_events_for_group(self, emt_events_group: EmtEventsGroup | None) -> List[Any]: """ Return the EMT events that belong to the selected group. ``None`` means that all EMT events registered in the grid remain active, which preserves the previous default behavior. """ emt_events: List[Any] = _get_grid_runtime_events(self.grid) if emt_events_group is None: return emt_events filtered_events: List[Any] = list() for evt in emt_events: evt_group: Any = evt.group if evt_group is None: group_idtag = None else: group_idtag = evt_group.idtag if group_idtag == emt_events_group.idtag: filtered_events.append(evt) else: pass return filtered_events def _event_targets_registered_parameter(self, evt: Any, parameter_uid: int) -> bool: """ Return whether the EMT event targets a runtime parameter registered in the problem. """ registered_device_idtag: str | None = self._event_parameter_device_idtags.get(parameter_uid, None) device_idtag: Any = evt.device_idtag if device_idtag is None: event_device_idtag = "" else: event_device_idtag = str(device_idtag) if registered_device_idtag is None: return False if event_device_idtag == "": return True return registered_device_idtag == event_device_idtag def _resolve_registered_event_parameter_uid(self, evt: Any) -> int | None: """ Resolve the runtime-parameter UID targeted by one EMT event. Events are often authored against the reusable device model stored on the grid element, while the EMT problem operates on cloned working models with fresh symbolic UIDs. This resolver first tries the direct UID match and then falls back to ``(device_idtag, parameter.name)`` so the event can be applied to the corresponding working-model parameter. :param evt: EMT event under inspection. :return: Registered runtime-parameter UID or ``None`` when not found. """ if isinstance(evt.parameter, Var): direct_uid: int = int(evt.parameter.uid) if direct_uid in self._event_parameter_device_idtags: return direct_uid else: pass event_device_idtag_raw: Any = evt.device_idtag if event_device_idtag_raw is None: return None else: lookup_key: Tuple[str, str] = (str(event_device_idtag_raw), str(evt.parameter.name)) return self._event_parameter_name_lookup.get(lookup_key, None) else: return None
[docs] def set_events_group(self, emt_events_group: EmtEventsGroup | None) -> None: """ Activate the selected EMT events group inside the problem runtime layer. Continuous EMT events are mapped to runtime ``piecewise(time)`` expressions. Discrete EMT mode events are stored as scheduled updates consumed by the boundary update hook used by the EMT solvers. :param emt_events_group: EMT events group to activate. ``None`` keeps all EMT events active. :return: None """ same_group_requested: bool if self._active_events_group is None: same_group_requested = emt_events_group is None and len(self._scheduled_mode_events) > 0 elif emt_events_group is None: same_group_requested = False else: same_group_requested = self._active_events_group.idtag == emt_events_group.idtag if same_group_requested: return else: pass active_runtime_eqs: List[Any] = list(self._runtime_parameter_eqs0) scheduled_mode_events: Dict[int, List[Tuple[float, float, bool]]] = dict() continuous_events: Dict[int, List[Dict[str, float | str | None]]] = { parameter.uid: list() for parameter in self.get_runtime_continuous_parameters() if parameter.uid in self._continuous_event_parameter_uids } emt_events: List[Any] = self._get_emt_events_for_group(emt_events_group) for evt in emt_events: parameter_uid: int | None = self._resolve_registered_event_parameter_uid(evt) if parameter_uid is not None: if self._event_targets_registered_parameter(evt, parameter_uid): if parameter_uid in self._discrete_event_parameter_uids: event_list: List[Tuple[float, float, bool]] = scheduled_mode_events.setdefault( parameter_uid, list(), ) force_step_alignment: bool = bool(evt.force_step_alignment) event_list.append( ( float(evt.time), float(evt.value), force_step_alignment, ) ) else: if parameter_uid in continuous_events: transition_type: DynamicEventTransitionType = evt.transition_type end_time = evt.end_time continuous_events[parameter_uid].append( dict({ "time": float(evt.time), "value": float(evt.value), "end_time": None if end_time is None else float(end_time), "transition_type": transition_type, }) ) else: pass else: pass else: pass for parameter_uid, event_specs in continuous_events.items(): if len(event_specs) != 0: runtime_idx: int = self.uid2idx_event_params[parameter_uid] sorted_specs: List[Dict[str, float | DynamicEventTransitionType | None]] = sorted( event_specs, key=_continuous_event_spec_time_sort_key, ) active_expr: Any = active_runtime_eqs[runtime_idx] event_spec: Dict[str, float | DynamicEventTransitionType | None] for event_spec in sorted_specs: if event_spec["transition_type"] == DynamicEventTransitionType.Ramp: start_time: float = float(event_spec["time"]) end_time_raw: float | str | None = event_spec["end_time"] if end_time_raw is None: raise ValueError("Ramp EMT events require an end_time") else: pass end_time: float = float(end_time_raw) if end_time > start_time: pass else: raise ValueError("Ramp EMT events require end_time greater than time") active_expr = _build_ramp_runtime_expr( time_var=self._glob_time, start_time=start_time, end_time=end_time, before_expr=active_expr, final_value=float(event_spec["value"]), ) else: active_expr = piecewise( time_var=self._glob_time, t_events=np.asarray([float(event_spec["time"])], dtype=np.float64), new_values=np.asarray([float(event_spec["value"])], dtype=np.float64), default_value=active_expr, ) active_runtime_eqs[runtime_idx] = active_expr else: pass self._active_events_group = emt_events_group self._runtime_all_eqs_source = active_runtime_eqs self._scheduled_mode_events = scheduled_mode_events # Event-group activation only changes the runtime equations and scheduled # updates; it does not change the parameter ordering. Reusing the existing # partition avoids rebuilding constant parameter buffers and repeated list # reconstruction during every event-group switch. self._runtime_continuous_eqs = list() self._runtime_mode_eqs = list() # ``active_runtime_eqs`` is already expressed in the flat runtime-vector # order used by ``uid2idx_event_params`` and by the solver runtime buffer. # Re-splitting it through ``_runtime_all_parameters_source`` would switch # back to source-block order and scramble retained mode parameters such as # Bergeron history terms. The slices keep the continuous/mode partition # aligned with the finalized flat parameter layout. runtime_continuous_eqs: List[Any] = list(active_runtime_eqs[self._runtime_continuous_slice]) runtime_mode_eqs: List[Any] = list(active_runtime_eqs[self._runtime_mode_slice]) for equation in runtime_continuous_eqs: self._runtime_continuous_eqs.append(equation) for equation in runtime_mode_eqs: self._runtime_mode_eqs.append(equation) self._event_parameters_eqs = list() for equation in self._runtime_continuous_eqs: self._event_parameters_eqs.append(equation) for equation in self._runtime_mode_eqs: self._event_parameters_eqs.append(equation) self._mode_event_cursor = dict() self._initialize_mode_event_state() if self.get_variable_parameter_number() > 0: self._event_params_values = self._initialize_runtime_parameter_values(0.0) self._event_params_values = self.def_event_params_fn(self._event_params_values, 0.0) else: self._event_params_values = np.zeros(0, dtype=np.float64)
# --------------------------------------------------------------------- # MAPPINGS # --------------------------------------------------------------------- def _rebuild_results_var_name_lookup(self) -> None: """ Rebuild the EMT result-name lookup with unique device-qualified keys. ``EmtProblemTemplate`` resets ``_vars_glob_name2uid`` using raw symbolic names only. Saved EMT cases often reuse template-local names such as ``i_ser_Pi_A`` or repeated load-template names, so the raw-name map drops earlier variables. Rebuilding the lookup here preserves one name per UID. :return: None. """ unique_names: Dict[str, int] = dict() mapped_uids: Set[int] = set() device: ALL_DEV_TYPES variables: List[Var] for device, variables in self._vars_info.items(): device_idtag: str = str(device.idtag) device_name: str = str(device.name) for var in variables: # Build the preferred global variable name from the symbolic # variable name and the owning device public name so the result # map stays readable before any duplicate-resolution fallback. candidate_name: str = f"{var.name}{device_name}" if candidate_name in unique_names and unique_names[candidate_name] != var.uid: candidate_name = f"{candidate_name}{device_idtag}" else: pass if candidate_name in unique_names and unique_names[candidate_name] != var.uid: candidate_name = f"{candidate_name}{var.uid}" else: pass unique_names[candidate_name] = var.uid mapped_uids.add(var.uid) for var in self.state_and_algebraic_vars(): if var.uid in mapped_uids: pass else: candidate_name = var.name if candidate_name in unique_names and unique_names[candidate_name] != var.uid: candidate_name = f"{candidate_name}{var.uid}" else: pass unique_names[candidate_name] = var.uid mapped_uids.add(var.uid) for diff_var in self.get_diff_vars(): if diff_var.uid in mapped_uids: pass else: candidate_name = diff_var.name if candidate_name in unique_names and unique_names[candidate_name] != diff_var.uid: candidate_name = f"{candidate_name}{diff_var.uid}" else: pass unique_names[candidate_name] = diff_var.uid mapped_uids.add(diff_var.uid) self._vars_glob_name2uid = unique_names # --------------------------------------------------------------------- # PF INIT # --------------------------------------------------------------------- def _get_bus_voltage_vector_3ph(self, bus_index: int) -> np.ndarray: """ Return the bus phase-voltage phasors from the three-phase PF result. The returned vector is in NABC order. :param bus_index: Bus index. :return: Complex voltage vector in NABC order. """ voltage_vector: np.ndarray = np.zeros(4, dtype=np.complex128) if self.power_flow_results_3ph is None: pass else: voltage_vector[0] = complex(self.power_flow_results_3ph.voltage_N[bus_index]) voltage_vector[1] = complex(self.power_flow_results_3ph.voltage_A[bus_index]) voltage_vector[2] = complex(self.power_flow_results_3ph.voltage_B[bus_index]) voltage_vector[3] = complex(self.power_flow_results_3ph.voltage_C[bus_index]) return voltage_vector def _get_bus_voltage_vector_balanced(self, bus_index: int) -> np.ndarray: """ Reconstruct balanced bus phase-voltage phasors in NABC order. :param bus_index: Bus index. :return: Complex voltage vector in NABC order. """ voltage_vector: np.ndarray = np.zeros(4, dtype=np.complex128) if self.power_flow_results is None: pass else: bus_voltage: complex = complex(self.power_flow_results.voltage[bus_index]) voltage_vector[0] = 0.0 + 0.0j voltage_vector[1] = bus_voltage voltage_vector[2] = bus_voltage * _phase_sequence_shift(2) voltage_vector[3] = bus_voltage * _phase_sequence_shift(3) return voltage_vector def _get_load_fixed_power_vector_3ph(self, load: Any) -> np.ndarray: """ Return the fixed per-phase complex power seed of a three-phase load. The EMT sign convention used here is injection-oriented, hence loads are represented as negative power injections. :param load: Load device. :return: Complex power vector in NABC order. """ phase_p_pu_abc: np.ndarray phase_q_pu_abc: np.ndarray phase_p_pu_nabc: np.ndarray phase_q_pu_nabc: np.ndarray power_vector: np.ndarray = _zero_ac_power_vector() # Three-phase PF seeds must still tolerate balanced static-device data. # Reuse the same explicit-or-balanced resolution as the generic static # parameter mapper so seeded per-phase powers stay consistent. phase_p_pu_abc, phase_q_pu_abc = get_load_power_phase_values_pu_abc(load=load, grid=self.grid) phase_p_pu_nabc = abc_to_nabc(phase_values_abc=phase_p_pu_abc) phase_q_pu_nabc = abc_to_nabc(phase_values_abc=phase_q_pu_abc) power_vector[0] = 0.0 + 0.0j power_vector[1] = complex(-float(phase_p_pu_nabc[1]), -float(phase_q_pu_nabc[1])) power_vector[2] = complex(-float(phase_p_pu_nabc[2]), -float(phase_q_pu_nabc[2])) power_vector[3] = complex(-float(phase_p_pu_nabc[3]), -float(phase_q_pu_nabc[3])) return power_vector @staticmethod def _get_load_fixed_power_vector_balanced(load: Any, sbase: float) -> np.ndarray: """ Return the balanced per-phase complex power seed of a load. :param load: Load device. :param sbase: System base power. :return: Complex power vector in NABC order. """ power_vector: np.ndarray = _zero_ac_power_vector() p_phase: float = float(load.P) / (3.0 * sbase) q_phase: float = float(load.Q) / (3.0 * sbase) power_vector[0] = 0.0 + 0.0j power_vector[1] = complex(-p_phase, -q_phase) power_vector[2] = complex(-p_phase, -q_phase) power_vector[3] = complex(-p_phase, -q_phase) return power_vector def _get_shunt_fixed_power_vector_3ph( self, shunt: Any, bus_index: int, ) -> np.ndarray: """ Return the fixed per-phase complex power seed of a shunt. The current EMT implementation only has direct access to total ``G`` and ``B`` in this parser stage. Therefore, the total shunt admittance is split equally among phases for initialization purposes. :param shunt: Shunt device. :param bus_index: Bus index. :return: Complex power vector in NABC order. """ voltage_vector: np.ndarray = self._get_bus_voltage_vector_3ph(bus_index) power_vector: np.ndarray = _zero_ac_power_vector() g_phase_abc_pu: np.ndarray b_phase_abc_pu: np.ndarray g_phase_nabc_pu: np.ndarray b_phase_nabc_pu: np.ndarray phase_index: int = 1 # Three-phase PF seeds must preserve explicit per-phase shunt data when it # exists, but they must also derive phase values from balanced totals when # the static device only carries positive-sequence quantities. g_phase_abc_pu, b_phase_abc_pu = get_shunt_phase_values_pu_abc(shunt=shunt, grid=self.grid) g_phase_nabc_pu = abc_to_nabc(phase_values_abc=g_phase_abc_pu) b_phase_nabc_pu = abc_to_nabc(phase_values_abc=b_phase_abc_pu) power_vector[0] = 0.0 + 0.0j while phase_index < 4: phase_voltage: complex = complex(voltage_vector[phase_index]) voltage_square: float = float(np.abs(phase_voltage) ** 2) power_vector[phase_index] = complex( float(g_phase_nabc_pu[phase_index]) * voltage_square, -float(b_phase_nabc_pu[phase_index]) * voltage_square, ) phase_index += 1 return power_vector def _get_shunt_fixed_power_vector_balanced( self, shunt: Any, bus_index: int, sbase: float, ) -> np.ndarray: """ Return the balanced per-phase complex power seed of a shunt. :param shunt: Shunt device. :param bus_index: Bus index. :param sbase: System base power. :return: Complex power vector in NABC order. """ voltage_vector: np.ndarray = self._get_bus_voltage_vector_balanced(bus_index) power_vector: np.ndarray = _zero_ac_power_vector() g_phase: float = float(shunt.G) / (3.0 * sbase) b_phase: float = float(shunt.B) / (3.0 * sbase) phase_index: int = 1 power_vector[0] = 0.0 + 0.0j while phase_index < 4: phase_voltage: complex = complex(voltage_vector[phase_index]) voltage_square: float = float(np.abs(phase_voltage) ** 2) power_vector[phase_index] = complex( g_phase * voltage_square, -b_phase * voltage_square, ) phase_index += 1 return power_vector def _build_injection_seed_store_3ph( self, grid: MultiCircuit, bus_dict: Dict[Any, int], injection_devices: List[Any], sbase: float, ) -> EmtInjectionSeedStore: """ Build the PF seed store for EMT injections using the three-phase PF snapshot. The algorithm mirrors the RMS logic: 1. loads and shunts contribute fixed known power injections, 2. the remaining bus power is treated as residual, 3. the residual is distributed among source-like injections on the same bus. The result is one per-device seed, which is exactly what the current EMT code is missing when multiple injections share the same bus. :param grid: Network model. :param bus_dict: Bus-to-index lookup. :param injection_devices: Fixed injection list used by the EMT builder. :param sbase: System base power. :return: Preallocated seed store. """ n_buses: int = len(grid.buses) n_injections: int = len(injection_devices) seed_store: EmtInjectionSeedStore = EmtInjectionSeedStore(n_injections) # The target bus powers come from the PF solution and are stored phase by phase. target_ac_power: np.ndarray = np.zeros((n_buses, 4), dtype=np.complex128) fixed_ac_power: np.ndarray = np.zeros((n_buses, 4), dtype=np.complex128) # DC buses use one real power residual only. target_dc_power: np.ndarray = np.zeros(n_buses, dtype=np.float64) fixed_dc_power: np.ndarray = np.zeros(n_buses, dtype=np.float64) # These matrices store which injections must absorb the residual. source_indices_ac: np.ndarray = -np.ones((n_buses, n_injections), dtype=np.int64) source_counts_ac: np.ndarray = np.zeros(n_buses, dtype=np.int64) source_indices_dc: np.ndarray = -np.ones((n_buses, n_injections), dtype=np.int64) source_counts_dc: np.ndarray = np.zeros(n_buses, dtype=np.int64) load_lookup: Dict[str, int] = _build_device_idtag_lookup(grid.loads) shunt_lookup: Dict[str, int] = _build_device_idtag_lookup(grid.shunts) bus_index: int = 0 while bus_index < n_buses: if grid.buses[bus_index].is_dc: if self.power_flow_results is None: target_dc_power[bus_index] = 0.0 else: target_dc_power[bus_index] = float(np.real(self.power_flow_results.Sbus[bus_index] / sbase)) else: target_ac_power[bus_index, 0] = complex(self.power_flow_results_3ph.Sbus_N[bus_index] / sbase) target_ac_power[bus_index, 1] = complex(self.power_flow_results_3ph.Sbus_A[bus_index] / sbase) target_ac_power[bus_index, 2] = complex(self.power_flow_results_3ph.Sbus_B[bus_index] / sbase) target_ac_power[bus_index, 3] = complex(self.power_flow_results_3ph.Sbus_C[bus_index] / sbase) bus_index += 1 injection_index: int = 0 while injection_index < n_injections: injection: Any = injection_devices[injection_index] bus_index = int(bus_dict[injection.bus]) if injection.bus.is_dc: # Loads are fixed DC consumers. All other DC injections are treated as source-like. if injection.idtag in load_lookup: fixed_power_dc: float = -float(injection.P) / sbase fixed_dc_power[bus_index] += fixed_power_dc seed_store.set_dc_seed(injection_index, fixed_power_dc) else: row_index_dc: int = int(source_counts_dc[bus_index]) source_indices_dc[bus_index, row_index_dc] = injection_index source_counts_dc[bus_index] += 1 else: if injection.idtag in load_lookup: fixed_power_ac: np.ndarray = self._get_load_fixed_power_vector_3ph(injection) fixed_ac_power[bus_index, :] += fixed_power_ac seed_store.set_ac_seed(injection_index, fixed_power_ac) else: if injection.idtag in shunt_lookup: fixed_power_ac = self._get_shunt_fixed_power_vector_3ph(injection, bus_index) fixed_ac_power[bus_index, :] += fixed_power_ac seed_store.set_ac_seed(injection_index, fixed_power_ac) else: row_index_ac: int = int(source_counts_ac[bus_index]) source_indices_ac[bus_index, row_index_ac] = injection_index source_counts_ac[bus_index] += 1 injection_index += 1 bus_index = 0 while bus_index < n_buses: if grid.buses[bus_index].is_dc: residual_dc: float = float(target_dc_power[bus_index] - fixed_dc_power[bus_index]) source_count_dc: int = int(source_counts_dc[bus_index]) if source_count_dc == 0: if abs(residual_dc) > 1.0e-9: self.logger.add_warning( msg="DC EMT injection residual could not be assigned.", device="EmtProblemDae", value=f"bus_index={bus_index}, residual={residual_dc}", expected_value="At least one source-like DC injection", device_class="EMT", device_property="injection_seed_store_3ph", ) else: pass else: total_weight_dc: float = 0.0 source_row_index_dc: int = 0 while source_row_index_dc < source_count_dc: injection_index = int(source_indices_dc[bus_index, source_row_index_dc]) total_weight_dc += _get_source_seed_weight(injection_devices[injection_index]) source_row_index_dc += 1 remaining_weight_dc: float = total_weight_dc remaining_residual_dc: float = residual_dc source_row_index_dc = 0 while source_row_index_dc < source_count_dc: injection_index = int(source_indices_dc[bus_index, source_row_index_dc]) if source_row_index_dc == source_count_dc - 1: allocated_dc: float = remaining_residual_dc else: weight_dc: float = _get_source_seed_weight(injection_devices[injection_index]) if remaining_weight_dc > 0.0: share_dc: float = weight_dc / remaining_weight_dc else: share_dc = 1.0 / float(source_count_dc - source_row_index_dc) allocated_dc = remaining_residual_dc * share_dc remaining_residual_dc -= allocated_dc remaining_weight_dc -= weight_dc seed_store.set_dc_seed(injection_index, allocated_dc) source_row_index_dc += 1 else: residual_ac: np.ndarray = target_ac_power[bus_index, :] - fixed_ac_power[bus_index, :] source_count_ac: int = int(source_counts_ac[bus_index]) if source_count_ac == 0: if np.max(np.abs(residual_ac)) > 1.0e-9: self.logger.add_warning( msg="AC EMT injection residual could not be assigned.", device="EmtProblemDae", value=f"bus_index={bus_index}, residual={residual_ac}", expected_value="At least one source-like AC injection", device_class="EMT", device_property="injection_seed_store_3ph", ) else: pass else: total_weight_ac: float = 0.0 source_row_index_ac: int = 0 while source_row_index_ac < source_count_ac: injection_index = int(source_indices_ac[bus_index, source_row_index_ac]) total_weight_ac += _get_source_seed_weight(injection_devices[injection_index]) source_row_index_ac += 1 remaining_weight_ac: float = total_weight_ac remaining_residual_ac: np.ndarray = residual_ac.copy() source_row_index_ac = 0 while source_row_index_ac < source_count_ac: injection_index = int(source_indices_ac[bus_index, source_row_index_ac]) if source_row_index_ac == source_count_ac - 1: allocated_ac: np.ndarray = remaining_residual_ac.copy() else: weight_ac: float = _get_source_seed_weight(injection_devices[injection_index]) if remaining_weight_ac > 0.0: share_ac: float = weight_ac / remaining_weight_ac else: share_ac = 1.0 / float(source_count_ac - source_row_index_ac) allocated_ac = remaining_residual_ac * share_ac remaining_residual_ac = remaining_residual_ac - allocated_ac remaining_weight_ac -= weight_ac seed_store.set_ac_seed(injection_index, allocated_ac) source_row_index_ac += 1 bus_index += 1 return seed_store def _build_injection_seed_store_balanced( self, grid: MultiCircuit, bus_dict: Dict[Any, int], injection_devices: List[Any], sbase: float, ) -> EmtInjectionSeedStore: """ Build the PF seed store for EMT injections using the balanced PF snapshot. The balanced PF gives one three-phase complex bus injection. This function converts it into balanced ABC phase powers and then applies the same residual-allocation logic as the three-phase version. :param grid: Network model. :param bus_dict: Bus-to-index lookup. :param injection_devices: Fixed injection list used by the EMT builder. :param sbase: System base power. :return: Preallocated seed store. """ n_buses: int = len(grid.buses) n_injections: int = len(injection_devices) seed_store: EmtInjectionSeedStore = EmtInjectionSeedStore(n_injections) target_ac_power: np.ndarray = np.zeros((n_buses, 4), dtype=np.complex128) fixed_ac_power: np.ndarray = np.zeros((n_buses, 4), dtype=np.complex128) target_dc_power: np.ndarray = np.zeros(n_buses, dtype=np.float64) fixed_dc_power: np.ndarray = np.zeros(n_buses, dtype=np.float64) source_indices_ac: np.ndarray = -np.ones((n_buses, n_injections), dtype=np.int64) source_counts_ac: np.ndarray = np.zeros(n_buses, dtype=np.int64) source_indices_dc: np.ndarray = -np.ones((n_buses, n_injections), dtype=np.int64) source_counts_dc: np.ndarray = np.zeros(n_buses, dtype=np.int64) load_lookup: Dict[str, int] = _build_device_idtag_lookup(grid.loads) shunt_lookup: Dict[str, int] = _build_device_idtag_lookup(grid.shunts) bus_index: int = 0 while bus_index < n_buses: if grid.buses[bus_index].is_dc: if self.power_flow_results is None: target_dc_power[bus_index] = 0.0 else: target_dc_power[bus_index] = float(np.real(self.power_flow_results.Sbus[bus_index] / sbase)) else: bus_power: complex = complex(self.power_flow_results.Sbus[bus_index] / sbase) phase_power: complex = bus_power / 3.0 target_ac_power[bus_index, 0] = 0.0 + 0.0j target_ac_power[bus_index, 1] = phase_power target_ac_power[bus_index, 2] = phase_power target_ac_power[bus_index, 3] = phase_power bus_index += 1 injection_index: int = 0 while injection_index < n_injections: injection: Any = injection_devices[injection_index] bus_index = int(bus_dict[injection.bus]) if injection.bus.is_dc: if injection.idtag in load_lookup: fixed_power_dc: float = -float(injection.P) / sbase fixed_dc_power[bus_index] += fixed_power_dc seed_store.set_dc_seed(injection_index, fixed_power_dc) else: row_index_dc: int = int(source_counts_dc[bus_index]) source_indices_dc[bus_index, row_index_dc] = injection_index source_counts_dc[bus_index] += 1 else: if injection.idtag in load_lookup: fixed_power_ac: np.ndarray = self._get_load_fixed_power_vector_balanced(injection, sbase) fixed_ac_power[bus_index, :] += fixed_power_ac seed_store.set_ac_seed(injection_index, fixed_power_ac) else: if injection.idtag in shunt_lookup: fixed_power_ac = self._get_shunt_fixed_power_vector_balanced(injection, bus_index, sbase) fixed_ac_power[bus_index, :] += fixed_power_ac seed_store.set_ac_seed(injection_index, fixed_power_ac) else: row_index_ac: int = int(source_counts_ac[bus_index]) source_indices_ac[bus_index, row_index_ac] = injection_index source_counts_ac[bus_index] += 1 injection_index += 1 bus_index = 0 while bus_index < n_buses: if grid.buses[bus_index].is_dc: residual_dc: float = float(target_dc_power[bus_index] - fixed_dc_power[bus_index]) source_count_dc: int = int(source_counts_dc[bus_index]) if source_count_dc == 0: if abs(residual_dc) > 1.0e-9: self.logger.add_warning( msg="Balanced DC EMT injection residual could not be assigned.", device="EmtProblemDae", value=f"bus_index={bus_index}, residual={residual_dc}", expected_value="At least one source-like DC injection", device_class="EMT", device_property="injection_seed_store_balanced", ) else: pass else: total_weight_dc: float = 0.0 source_row_index_dc: int = 0 while source_row_index_dc < source_count_dc: injection_index = int(source_indices_dc[bus_index, source_row_index_dc]) total_weight_dc += _get_source_seed_weight(injection_devices[injection_index]) source_row_index_dc += 1 remaining_weight_dc: float = total_weight_dc remaining_residual_dc: float = residual_dc source_row_index_dc = 0 while source_row_index_dc < source_count_dc: injection_index = int(source_indices_dc[bus_index, source_row_index_dc]) if source_row_index_dc == source_count_dc - 1: allocated_dc: float = remaining_residual_dc else: weight_dc: float = _get_source_seed_weight(injection_devices[injection_index]) if remaining_weight_dc > 0.0: share_dc: float = weight_dc / remaining_weight_dc else: share_dc = 1.0 / float(source_count_dc - source_row_index_dc) allocated_dc = remaining_residual_dc * share_dc remaining_residual_dc -= allocated_dc remaining_weight_dc -= weight_dc seed_store.set_dc_seed(injection_index, allocated_dc) source_row_index_dc += 1 else: residual_ac: np.ndarray = target_ac_power[bus_index, :] - fixed_ac_power[bus_index, :] source_count_ac: int = int(source_counts_ac[bus_index]) if source_count_ac == 0: if np.max(np.abs(residual_ac)) > 1.0e-9: self.logger.add_warning( msg="Balanced AC EMT injection residual could not be assigned.", device="EmtProblemDae", value=f"bus_index={bus_index}, residual={residual_ac}", expected_value="At least one source-like AC injection", device_class="EMT", device_property="injection_seed_store_balanced", ) else: pass else: total_weight_ac: float = 0.0 source_row_index_ac: int = 0 while source_row_index_ac < source_count_ac: injection_index = int(source_indices_ac[bus_index, source_row_index_ac]) total_weight_ac += _get_source_seed_weight(injection_devices[injection_index]) source_row_index_ac += 1 remaining_weight_ac: float = total_weight_ac remaining_residual_ac: np.ndarray = residual_ac.copy() source_row_index_ac = 0 while source_row_index_ac < source_count_ac: injection_index = int(source_indices_ac[bus_index, source_row_index_ac]) if source_row_index_ac == source_count_ac - 1: allocated_ac: np.ndarray = remaining_residual_ac.copy() else: weight_ac: float = _get_source_seed_weight(injection_devices[injection_index]) if remaining_weight_ac > 0.0: share_ac: float = weight_ac / remaining_weight_ac else: share_ac = 1.0 / float(source_count_ac - source_row_index_ac) allocated_ac = remaining_residual_ac * share_ac remaining_residual_ac = remaining_residual_ac - allocated_ac remaining_weight_ac -= weight_ac seed_store.set_ac_seed(injection_index, allocated_ac) source_row_index_ac += 1 bus_index += 1 return seed_store def _apply_injection_seed( self, injection: Any, mdl: Block, bus_index: int, seed_store: EmtInjectionSeedStore, injection_index: int, generator_count_same_bus: int, ) -> None: """ Apply the precomputed PF seed of one injection into the EMT model. This replaces the old logic that initialized each injection directly from ``Sbus`` of the bus. That old logic is only valid when there is exactly one injection device on the bus. :param injection: Injection device. :param mdl: Injection symbolic block. :param bus_index: Bus index. :param seed_store: Precomputed seed store. :param injection_index: Injection position in the fixed injection list. :return: None. """ if seed_store.has_seed(injection_index): pass else: return if injection.bus.is_dc: if self.power_flow_results is None: pass else: power_dc: float = seed_store.get_dc_seed(injection_index) voltage_dc: float = float(np.abs(self.power_flow_results.voltage[bus_index])) self.set_init_guess( mdl=mdl, reference_powerflow=VarPowerFlowReferenceType.P, val=power_dc, ) if abs(voltage_dc) > 1.0e-12: self.set_if_exists( mdl=mdl, key=VarPowerFlowReferenceType.Idc, value=power_dc / voltage_dc, ) else: self.set_if_exists( mdl=mdl, key=VarPowerFlowReferenceType.Idc, value=0.0, ) else: phase_power: np.ndarray = seed_store.get_ac_seed(injection_index) phase_current_vector: np.ndarray = np.zeros(4, dtype=np.complex128) preserve_generator_current_seed: bool if self.power_flow_results_3ph is None: voltage_vector: np.ndarray = self._get_bus_voltage_vector_balanced(bus_index) else: voltage_vector = self._get_bus_voltage_vector_3ph(bus_index) if injection.device_type == DeviceType.GeneratorDevice and generator_count_same_bus <= 1: preserve_generator_current_seed = False else: preserve_generator_current_seed = True phase_index: int = 0 p_total: float = 0.0 q_total: float = 0.0 current_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.i_N, VarPowerFlowReferenceType.i_A, VarPowerFlowReferenceType.i_B, VarPowerFlowReferenceType.i_C, ]) active_power_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.P_N, VarPowerFlowReferenceType.P_A, VarPowerFlowReferenceType.P_B, VarPowerFlowReferenceType.P_C, ]) reactive_power_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.Q_N, VarPowerFlowReferenceType.Q_A, VarPowerFlowReferenceType.Q_B, VarPowerFlowReferenceType.Q_C, ]) while phase_index < 4: phase_voltage: complex = complex(voltage_vector[phase_index]) phase_complex_power: complex = complex(phase_power[phase_index]) if abs(phase_voltage) > 1.0e-12: phase_current: complex = np.conj(phase_complex_power / phase_voltage) else: phase_current = 0.0 + 0.0j phase_current_vector[phase_index] = phase_current current_instantaneous: float = float(np.sqrt(2.0) * np.imag(phase_current)) self.set_if_exists( mdl=mdl, key=current_keys[phase_index], value=current_instantaneous, persist_after_native_init=preserve_generator_current_seed, ) self.set_if_exists( mdl=mdl, key=active_power_keys[phase_index], value=float(np.real(phase_complex_power)), ) self.set_if_exists( mdl=mdl, key=reactive_power_keys[phase_index], value=float(np.imag(phase_complex_power)), ) p_total += float(np.real(phase_complex_power)) q_total += float(np.imag(phase_complex_power)) phase_index += 1 if injection.device_type == DeviceType.GeneratorDevice: self._set_vsc_pf_positive_sequence( mdl=mdl, VA=complex(voltage_vector[1]), VB=complex(voltage_vector[2]), VC=complex(voltage_vector[3]), IA=complex(phase_current_vector[1]), IB=complex(phase_current_vector[2]), IC=complex(phase_current_vector[3]), ) else: pass neutral_var = _get_external_mapping_var_if_present( mdl=mdl, key=VarPowerFlowReferenceType.i_N, ) if neutral_var is not None: ia_var = _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.i_A) ib_var = _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.i_B) ic_var = _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.i_C) if ia_var is None and ib_var is None and ic_var is None: pass else: ia_inst = self._temp_init_guess.get(ia_var.uid, 0.0) if ia_var is not None else 0.0 ib_inst = self._temp_init_guess.get(ib_var.uid, 0.0) if ib_var is not None else 0.0 ic_inst = self._temp_init_guess.get(ic_var.uid, 0.0) if ic_var is not None else 0.0 neutral_inst = -(ia_inst + ib_inst + ic_inst) self._temp_init_guess[neutral_var.uid] = float(neutral_inst) self._temp_post_init_guess[neutral_var.uid] = float(neutral_inst) else: pass self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.P, value=p_total) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Q, value=q_total) # The Thevenin generator templates reconstruct an internal balanced emf # from the PF seed. Those internal variables are not externally mapped, # so they must be preserved explicitly across the later native-init pass. if injection.device_type == DeviceType.GeneratorDevice and generator_count_same_bus > 1: self._seed_thevenin_internal_emf_from_pf( injection=injection, mdl=mdl, phase_power=phase_power, voltage_vector=voltage_vector, ) else: pass omega_base: float = 2.0 * np.pi * self.grid.fBase voltage_derivative_keys: List[VarPowerFlowReferenceType] = list([ VarPowerFlowReferenceType.d_v_N, VarPowerFlowReferenceType.d_v_A, VarPowerFlowReferenceType.d_v_B, VarPowerFlowReferenceType.d_v_C, ]) phase_index = 0 while phase_index < 4: derivative_key: VarPowerFlowReferenceType = voltage_derivative_keys[phase_index] derivative_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=derivative_key) if derivative_var is not None: phase_voltage = complex(voltage_vector[phase_index]) derivative_value: float = omega_base * np.sqrt(2.0) * float(np.real(phase_voltage)) self.set_external_param(mdl, derivative_key, derivative_value) else: pass phase_index += 1 def _seed_thevenin_internal_emf_from_pf( self, injection: Any, mdl: Block, phase_power: np.ndarray, voltage_vector: np.ndarray, ) -> None: """ Preserve the PF-consistent internal emf initialization of Thevenin generators. The Thevenin templates expose ``theta`` and ``e_A/e_B/e_C`` only as internal symbolic variables. The generic native EMT initialization can overwrite those exact explicit-init values with weaker guesses because they are not part of the external PF mapping. This helper reconstructs the same balanced emf from the already available PF seed and stores it as a post-native-init value. The helper is intentionally no-op for non-Thevenin generator models because the required internal variable names will simply be absent. :param injection: Generator device under initialization. :param mdl: Working EMT model of the generator. :param phase_power: Complex power seed in NABC order. :param voltage_vector: Complex bus-voltage seed in NABC order. :return: None. """ c_sqrt_2: float = float(np.sqrt(2.0)) phase_a: complex = complex(voltage_vector[1]) phase_b: complex = complex(voltage_vector[2]) phase_c: complex = complex(voltage_vector[3]) power_a: complex = complex(phase_power[1]) power_b: complex = complex(phase_power[2]) power_c: complex = complex(phase_power[3]) a_operator: complex = np.exp(1j * 2.0 * np.pi / 3.0) current_a: complex current_b: complex current_c: complex if abs(phase_a) > 1.0e-12: current_a = np.conj(power_a / phase_a) else: current_a = 0.0 + 0.0j if abs(phase_b) > 1.0e-12: current_b = np.conj(power_b / phase_b) else: current_b = 0.0 + 0.0j if abs(phase_c) > 1.0e-12: current_c = np.conj(power_c / phase_c) else: current_c = 0.0 + 0.0j # The positive-sequence phasors define the same operating point used by the # explicit-init equations inside the Thevenin template. voltage_positive_sequence: complex = ( phase_a + a_operator * phase_b + (a_operator * a_operator) * phase_c ) / 3.0 current_positive_sequence: complex = ( current_a + a_operator * current_b + (a_operator * a_operator) * current_c ) / 3.0 phi_v: float = float(np.angle(voltage_positive_sequence)) phi_i: float = float(np.angle(current_positive_sequence)) phi_rel_raw: float = float(phi_i - phi_v) phi_rel: float = float(np.arctan2(np.sin(phi_rel_raw), np.cos(phi_rel_raw))) voltage_peak: float = float(c_sqrt_2 * np.abs(voltage_positive_sequence)) current_peak: float = float(c_sqrt_2 * np.abs(current_positive_sequence)) resistance_s: float = float(injection.R1) reactance_s: float = float(injection.X1) # The internal emf is built in the local frame first and then rotated back # to the absolute electrical angle used by the abc sinusoidal source. e_re_rel: float = float( voltage_peak + resistance_s * current_peak * np.cos(phi_rel) - reactance_s * current_peak * np.sin(phi_rel) ) e_im_rel: float = float( resistance_s * current_peak * np.sin(phi_rel) + reactance_s * current_peak * np.cos(phi_rel) ) theta_abs: float = float(phi_v + np.arctan2(e_im_rel, e_re_rel)) emf_peak: float = float(np.sqrt(e_re_rel * e_re_rel + e_im_rel * e_im_rel)) e_a_value: float = float(emf_peak * np.sin(theta_abs)) e_b_value: float = float(emf_peak * np.sin(theta_abs - 2.0 * np.pi / 3.0)) e_c_value: float = float(emf_peak * np.sin(theta_abs + 2.0 * np.pi / 3.0)) # The recent Thevenin refactor moved the internal emf reconstruction to # algebraic equations driven by init-only runtime parameters. Explicit # initialization resolves those parameters correctly, but the later # event-group activation rebuilds the runtime-parameter vector from the # model-owned ``event_dict`` definitions. Persisting the PF-derived # positive-sequence values here keeps the first simulation step aligned # with the exact steady-state source used by explicit initialization. self.set_internal_runtime_if_exists(mdl=mdl, var_name="phi_v_" + mdl.name, value=phi_v) self.set_internal_runtime_if_exists(mdl=mdl, var_name="phi_" + mdl.name, value=phi_rel) self.set_internal_runtime_if_exists(mdl=mdl, var_name="Vpk_" + mdl.name, value=voltage_peak) self.set_internal_runtime_if_exists(mdl=mdl, var_name="Ipk_" + mdl.name, value=current_peak) theta_var: Optional[Var] = _find_internal_var_by_prefix(mdl, "theta_") e_a_var: Optional[Var] = _find_internal_var_by_prefix(mdl, "e_A_") e_b_var: Optional[Var] = _find_internal_var_by_prefix(mdl, "e_B_") e_c_var: Optional[Var] = _find_internal_var_by_prefix(mdl, "e_C_") # The preserved guesses are written directly into both init dictionaries so # the later native-init pass starts from the PF-consistent emf and the final # post-native state keeps the same values. if theta_var is not None: self._temp_init_guess[theta_var.uid] = theta_abs self._temp_post_init_guess[theta_var.uid] = theta_abs else: pass if e_a_var is not None: self._temp_init_guess[e_a_var.uid] = e_a_value self._temp_post_init_guess[e_a_var.uid] = e_a_value else: pass if e_b_var is not None: self._temp_init_guess[e_b_var.uid] = e_b_value self._temp_post_init_guess[e_b_var.uid] = e_b_value else: pass if e_c_var is not None: self._temp_init_guess[e_c_var.uid] = e_c_value self._temp_post_init_guess[e_c_var.uid] = e_c_value else: pass def _try_set_bus_pf_init(self, bus: Bus, mdl: Block, bus_index: int) -> None: """ Initialize bus voltage variables from three-phase power-flow results. Phasor results are transformed into instantaneous abc-domain values at t = 0. Only variables that exist in the model external mapping are initialized. :param bus: Bus device :param mdl: Bus symbolic block. :param bus_index: Bus index in the power-flow results. :return: None """ omega_base: float = 2.0 * np.pi * self.grid.fBase if bus.is_dc: if self.power_flow_results is not None: # DC bus: use Vdc (magnitude) -> angle is not applicable for DC self.set_init_guess_and_preserve_post_init( mdl, VarPowerFlowReferenceType.Vdc, float(np.abs(self.power_flow_results.voltage[bus_index])) ) else: raise ValueError(f"EMT simulation with DC branches requires the non unbalanced PowerFlow to be calculated.") else: if _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.v_N) is not None: V_N = self.power_flow_results_3ph.voltage_N[bus_index] v_N: float = np.sqrt(2.0) * np.imag(V_N) # d_v_N: float = omega_base * np.sqrt(2.0) * np.real(V_N) self.set_init_guess_and_preserve_post_init(mdl, VarPowerFlowReferenceType.v_N, v_N) # self.set_diff_init_guess(mdl, VarPowerFlowReferenceType.d_v_N, d_v_N) else: pass if _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.v_A) is not None: V_A = self.power_flow_results_3ph.voltage_A[bus_index] v_A: float = np.sqrt(2.0) * np.imag(V_A) # d_v_A: float = omega_base * np.sqrt(2.0) * np.real(V_A) self.set_init_guess_and_preserve_post_init(mdl, VarPowerFlowReferenceType.v_A, v_A) # self.set_diff_init_guess(mdl, VarPowerFlowReferenceType.d_v_A, d_v_A) else: pass if _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.v_B) is not None: V_B = self.power_flow_results_3ph.voltage_B[bus_index] v_B: float = np.sqrt(2.0) * np.imag(V_B) # d_v_B: float = omega_base * np.sqrt(2.0) * np.real(V_B) self.set_init_guess_and_preserve_post_init(mdl, VarPowerFlowReferenceType.v_B, v_B) # self.set_diff_init_guess(mdl, VarPowerFlowReferenceType.d_v_B, d_v_B) else: pass if _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.v_C) is not None: V_C = self.power_flow_results_3ph.voltage_C[bus_index] v_C: float = np.sqrt(2.0) * np.imag(V_C) # d_v_C: float = omega_base * np.sqrt(2.0) * np.real(V_C) self.set_init_guess_and_preserve_post_init(mdl, VarPowerFlowReferenceType.v_C, v_C) # self.set_diff_init_guess(mdl, VarPowerFlowReferenceType.d_v_C, d_v_C) else: pass def _try_set_bus_pf_init_balanced(self, bus: Bus, mdl: Block, bus_index: int) -> None: """ Initialize bus voltage variables from balanced power-flow results. The balanced power flow provides one complex bus-voltage phasor per AC bus in self.power_flow_results.voltage. This phasor is interpreted as the balanced phase-A phasor, and the remaining phases are reconstructed as: V_A = V V_B = V * exp(-j*2*pi/3) V_C = V * exp(+j*2*pi/3) V_N = 0 Phasor results are transformed into instantaneous abc-domain values at t = 0. Only variables that exist in the model external mapping are initialized. :param bus: Bus device :param mdl: Bus symbolic block. :param bus_index: Bus index in the power-flow results. :return: None """ omega_base: float = 2.0 * np.pi * self.grid.fBase if bus.is_dc: if self.power_flow_results is not None: # DC bus: use Vdc (magnitude) -> angle is not applicable for DC self.set_init_guess_and_preserve_post_init( mdl, VarPowerFlowReferenceType.Vdc, float(np.abs(self.power_flow_results.voltage[bus_index])) ) else: raise ValueError( "EMT simulation with DC branches requires the non unbalanced PowerFlow to be calculated." ) else: if self.power_flow_results is None: raise ValueError( "Balanced EMT initialization requires the balanced PowerFlow to be calculated." ) alpha: float = 2.0 * np.pi / 3.0 V_bus: CxVec = self.power_flow_results.voltage[bus_index] V_N: complex = 0.0 + 0.0j V_A: CxVec = V_bus V_B: complex = V_bus * np.exp(-1j * alpha) V_C: complex = V_bus * np.exp(+1j * alpha) if _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.v_N) is not None: v_N: float = np.sqrt(2.0) * np.imag(V_N) # d_v_N: float = omega_base * np.sqrt(2.0) * np.real(V_N) self.set_init_guess_and_preserve_post_init(mdl, VarPowerFlowReferenceType.v_N, v_N) # self.set_diff_init_guess(mdl, VarPowerFlowReferenceType.d_v_N, d_v_N) if _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.v_A) is not None: v_A: float = np.sqrt(2.0) * np.imag(V_A) # d_v_A: float = omega_base * np.sqrt(2.0) * np.real(V_A) self.set_init_guess_and_preserve_post_init(mdl, VarPowerFlowReferenceType.v_A, v_A) # self.set_diff_init_guess(mdl, VarPowerFlowReferenceType.d_v_A, d_v_A) if _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.v_B) is not None: v_B: float = np.sqrt(2.0) * np.imag(V_B) # d_v_B: float = omega_base * np.sqrt(2.0) * np.real(V_B) self.set_init_guess_and_preserve_post_init(mdl, VarPowerFlowReferenceType.v_B, v_B) # self.set_diff_init_guess(mdl, VarPowerFlowReferenceType.d_v_B, d_v_B) if _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.v_C) is not None: v_C: float = np.sqrt(2.0) * np.imag(V_C) # d_v_C: float = omega_base * np.sqrt(2.0) * np.real(V_C) self.set_init_guess_and_preserve_post_init(mdl, VarPowerFlowReferenceType.v_C, v_C) # self.set_diff_init_guess(mdl, VarPowerFlowReferenceType.d_v_C, d_v_C) def _get_vsc_terminal_indices(self, f_bus_idx: int, t_bus_idx: int) -> Tuple[int, int, bool]: """ Return ``(ac_bus_idx, dc_bus_idx, ac_is_from)`` for a VSC branch. The preferred detection rule is based on bus types. If the branch does not present the expected AC/DC split, fall back to the legacy VSC orientation assumption used by the PF results: from side is DC and to side is AC. """ f_bus = self.grid.buses[f_bus_idx] t_bus = self.grid.buses[t_bus_idx] if not f_bus.is_dc and t_bus.is_dc: return f_bus_idx, t_bus_idx, True if f_bus.is_dc and not t_bus.is_dc: return t_bus_idx, f_bus_idx, False return t_bus_idx, f_bus_idx, False def _set_vsc_pf_positive_sequence(self, mdl: Block, VA: complex, VB: complex, VC: complex, IA: complex, IB: complex, IC: complex) -> None: """ Populate positive-sequence PF-derived quantities used by VSC templates when they expose them in the external mapping. """ a = np.exp(1j * 2.0 * np.pi / 3.0) V1 = (VA + a * VB + (a * a) * VC) / 3.0 I1 = (IA + a * IB + (a * a) * IC) / 3.0 phi_V = float(np.angle(V1)) phi_I = float(np.angle(I1)) ang = phi_I - phi_V phi = float(np.arctan2(np.sin(ang), np.cos(ang))) external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) if external_mapping is None: return if VarPowerFlowReferenceType.phi_v in external_mapping: self.set_external_param(mdl, VarPowerFlowReferenceType.phi_v, phi_V) if VarPowerFlowReferenceType.phi in external_mapping: self.set_external_param(mdl, VarPowerFlowReferenceType.phi, phi) if VarPowerFlowReferenceType.Vpk in external_mapping: self.set_external_param(mdl, VarPowerFlowReferenceType.Vpk, float(np.sqrt(2.0) * np.abs(V1))) if VarPowerFlowReferenceType.Ipk in external_mapping: self.set_external_param(mdl, VarPowerFlowReferenceType.Ipk, float(np.sqrt(2.0) * np.abs(I1))) def _try_set_vsc_branch_pf_init(self, mdl: Block, f_bus_idx: int, t_bus_idx: int, sbase: float, vsc_index: int) -> None: """ Initialize a VSC branch from three-phase PF results using the converter external mapping as the source of truth. """ pf_results_3ph = self.power_flow_results_3ph if pf_results_3ph is None: return ac_bus_idx, dc_bus_idx, ac_is_from = self._get_vsc_terminal_indices( f_bus_idx=f_bus_idx, t_bus_idx=t_bus_idx, ) VA = pf_results_3ph.voltage_A[ac_bus_idx] VB = pf_results_3ph.voltage_B[ac_bus_idx] VC = pf_results_3ph.voltage_C[ac_bus_idx] VN = 0.0 + 0.0j SA = pf_results_3ph.St_vsc_A[vsc_index] / sbase SB = pf_results_3ph.St_vsc_B[vsc_index] / sbase SC = pf_results_3ph.St_vsc_C[vsc_index] / sbase if abs(SA) + abs(SB) + abs(SC) < 1e-12 and self.power_flow_results is not None: # The 3-ph VSC result container may not populate per-phase converter powers yet. # In that case we fall back to the balanced PF operating point and distribute the # total converter power equally across the three phases for initialization only. S_ac_total_balanced: complex = _get_balanced_vsc_ac_power(self.power_flow_results, vsc_index) / sbase SA = S_ac_total_balanced / 3.0 SB = S_ac_total_balanced / 3.0 SC = S_ac_total_balanced / 3.0 else: pass IA = 0.0 + 0.0j if abs(VA) <= 1e-12 else np.conj(SA / VA) IB = 0.0 + 0.0j if abs(VB) <= 1e-12 else np.conj(SB / VB) IC = 0.0 + 0.0j if abs(VC) <= 1e-12 else np.conj(SC / VC) pf_results_3ph_pfn_vsc: float = float(pf_results_3ph.Pfp_vsc[vsc_index]) S_ac_total = SA + SB + SC if abs(S_ac_total) < 1e-12 and self.power_flow_results is not None: S_dc_total = _get_balanced_vsc_dc_power(self.power_flow_results, vsc_index) / sbase else: S_dc_total = complex( ( pf_results_3ph.Pfp_vsc[vsc_index] + pf_results_3ph_pfn_vsc ) / sbase, 0.0, ) phase_voltage_dict: Dict[str, complex] = { "N": VN, "A": VA, "B": VB, "C": VC, } phase_current_dict: Dict[str, complex] = { "N": 0.0 + 0.0j, "A": IA, "B": IB, "C": IC, } phase_power_dict: Dict[str, complex] = { "N": 0.0 + 0.0j, "A": SA, "B": SB, "C": SC, } ac_voltage_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.v_N, "A": VarPowerFlowReferenceType.v_A, "B": VarPowerFlowReferenceType.v_B, "C": VarPowerFlowReferenceType.v_C, } ac_current_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.i_N, "A": VarPowerFlowReferenceType.i_A, "B": VarPowerFlowReferenceType.i_B, "C": VarPowerFlowReferenceType.i_C, } if_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.if_N, "A": VarPowerFlowReferenceType.if_A, "B": VarPowerFlowReferenceType.if_B, "C": VarPowerFlowReferenceType.if_C, } it_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.it_N, "A": VarPowerFlowReferenceType.it_A, "B": VarPowerFlowReferenceType.it_B, "C": VarPowerFlowReferenceType.it_C, } sf_keys: Dict[str, Optional[VarPowerFlowReferenceType]] = { "N": None, "A": VarPowerFlowReferenceType.Sf_A, "B": VarPowerFlowReferenceType.Sf_B, "C": VarPowerFlowReferenceType.Sf_C, } st_keys: Dict[str, Optional[VarPowerFlowReferenceType]] = { "N": None, "A": VarPowerFlowReferenceType.St_A, "B": VarPowerFlowReferenceType.St_B, "C": VarPowerFlowReferenceType.St_C, } d_vf_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.d_v_N_f, "A": VarPowerFlowReferenceType.d_v_A_f, "B": VarPowerFlowReferenceType.d_v_B_f, "C": VarPowerFlowReferenceType.d_v_C_f, } d_vt_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.d_v_N_t, "A": VarPowerFlowReferenceType.d_v_A_t, "B": VarPowerFlowReferenceType.d_v_B_t, "C": VarPowerFlowReferenceType.d_v_C_t, } omega_base: float = 2.0 * np.pi * self.grid.fBase # ``I_ph = conj(S/V)`` already yields the branch-convention current # from bus into branch. The EMT KCL also uses branch # convention for VSC branches (``I_kcl -= i_branch``), so the # seeded value is taken directly without any sign flip, just like # with other branches for ph in ["N", "A", "B", "C"]: V_ph = phase_voltage_dict[ph] I_ph = phase_current_dict[ph] S_ph = phase_power_dict[ph] # Store it once and reuse i_branch_init: float = float(np.sqrt(2.0) * np.imag(I_ph)) self.set_if_exists(mdl=mdl, key=ac_voltage_keys[ph], value=float(np.sqrt(2.0) * np.imag(V_ph))) self.set_if_exists(mdl=mdl, key=ac_current_keys[ph], value=i_branch_init) if ac_is_from: self.set_if_exists(mdl=mdl, key=if_keys[ph], value=i_branch_init) self.set_if_exists(mdl=mdl, key=it_keys[ph], value=0.0) self.set_external_param(mdl, d_vf_keys[ph], omega_base * np.sqrt(2.0) * np.real(V_ph)) self.set_external_param(mdl, d_vt_keys[ph], 0.0) else: self.set_if_exists(mdl=mdl, key=if_keys[ph], value=0.0) self.set_if_exists(mdl=mdl, key=it_keys[ph], value=i_branch_init) self.set_external_param(mdl, d_vf_keys[ph], 0.0) self.set_external_param(mdl, d_vt_keys[ph], omega_base * np.sqrt(2.0) * np.real(V_ph)) sf_key = sf_keys[ph] st_key = st_keys[ph] if sf_key is not None: self.set_if_exists(mdl=mdl, key=sf_key, value=float(np.real(S_ph if ac_is_from else 0.0 + 0.0j))) if st_key is not None: self.set_if_exists(mdl=mdl, key=st_key, value=float(np.real(0.0 + 0.0j if ac_is_from else S_ph))) self._set_vsc_pf_positive_sequence( mdl=mdl, VA=complex(VA), VB=complex(VB), VC=complex(VC), IA=IA, IB=IB, IC=IC, ) if self.power_flow_results is not None: v_dc = float(np.abs(self.power_flow_results.voltage[dc_bus_idx])) self.set_if_exists( mdl=mdl, key=VarPowerFlowReferenceType.Vdc, value=v_dc, persist_after_native_init=True, ) if abs(v_dc) > 1e-12: self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Idc, value=float(np.real(S_dc_total) / v_dc), persist_after_native_init=True) else: self.set_if_exists( mdl=mdl, key=VarPowerFlowReferenceType.Idc, value=0.0, persist_after_native_init=True, ) self._seed_switched_converter_internal_init_balanced( mdl=mdl, VA=complex(VA), VB=complex(VB), VC=complex(VC), IA=IA, IB=IB, IC=IC, v_dc=v_dc, ) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.P, value=float(np.real(S_ac_total))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Q, value=float(np.imag(S_ac_total))) S_from = S_ac_total if ac_is_from else S_dc_total S_to = S_dc_total if ac_is_from else S_ac_total self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pf, value=float(np.real(S_from))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qf, value=float(np.imag(S_from))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pt, value=float(np.real(S_to))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qt, value=float(np.imag(S_to))) def _try_set_vsc_branch_pf_init_balanced(self, mdl: Block, f_bus_idx: int, t_bus_idx: int, sbase: float, vsc_index: int) -> None: """ Initialize a VSC branch from balanced PF results using the converter external mapping as the source of truth. """ pf_results = self.power_flow_results if pf_results is None: return ac_bus_idx, dc_bus_idx, ac_is_from = self._get_vsc_terminal_indices( f_bus_idx=f_bus_idx, t_bus_idx=t_bus_idx, ) alpha = 2.0 * np.pi / 3.0 V_bus = pf_results.voltage[ac_bus_idx] VA = V_bus VB = V_bus * np.exp(-1j * alpha) VC = V_bus * np.exp(+1j * alpha) VN = 0.0 + 0.0j pf_results_pfn_vsc: float = float(pf_results.Pfn_vsc[vsc_index]) S_ac_total = _get_balanced_vsc_ac_power(pf_results, vsc_index) / sbase S_dc_total = _get_balanced_vsc_dc_power(pf_results, vsc_index) / sbase SA = S_ac_total / 3.0 SB = S_ac_total / 3.0 SC = S_ac_total / 3.0 IA = 0.0 + 0.0j if abs(VA) <= 1e-12 else np.conj(SA / VA) IB = 0.0 + 0.0j if abs(VB) <= 1e-12 else np.conj(SB / VB) IC = 0.0 + 0.0j if abs(VC) <= 1e-12 else np.conj(SC / VC) phase_voltage_dict: Dict[str, complex] = { "N": VN, "A": VA, "B": VB, "C": VC, } phase_current_dict: Dict[str, complex] = { "N": 0.0 + 0.0j, "A": IA, "B": IB, "C": IC, } phase_power_dict: Dict[str, complex] = { "N": 0.0 + 0.0j, "A": SA, "B": SB, "C": SC, } ac_voltage_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.v_N, "A": VarPowerFlowReferenceType.v_A, "B": VarPowerFlowReferenceType.v_B, "C": VarPowerFlowReferenceType.v_C, } ac_current_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.i_N, "A": VarPowerFlowReferenceType.i_A, "B": VarPowerFlowReferenceType.i_B, "C": VarPowerFlowReferenceType.i_C, } if_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.if_N, "A": VarPowerFlowReferenceType.if_A, "B": VarPowerFlowReferenceType.if_B, "C": VarPowerFlowReferenceType.if_C, } it_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.it_N, "A": VarPowerFlowReferenceType.it_A, "B": VarPowerFlowReferenceType.it_B, "C": VarPowerFlowReferenceType.it_C, } sf_keys: Dict[str, Optional[VarPowerFlowReferenceType]] = { "N": None, "A": VarPowerFlowReferenceType.Sf_A, "B": VarPowerFlowReferenceType.Sf_B, "C": VarPowerFlowReferenceType.Sf_C, } st_keys: Dict[str, Optional[VarPowerFlowReferenceType]] = { "N": None, "A": VarPowerFlowReferenceType.St_A, "B": VarPowerFlowReferenceType.St_B, "C": VarPowerFlowReferenceType.St_C, } d_vf_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.d_v_N_f, "A": VarPowerFlowReferenceType.d_v_A_f, "B": VarPowerFlowReferenceType.d_v_B_f, "C": VarPowerFlowReferenceType.d_v_C_f, } d_vt_keys: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.d_v_N_t, "A": VarPowerFlowReferenceType.d_v_A_t, "B": VarPowerFlowReferenceType.d_v_B_t, "C": VarPowerFlowReferenceType.d_v_C_t, } omega_base: float = 2.0 * np.pi * self.grid.fBase for ph in ["N", "A", "B", "C"]: V_ph = phase_voltage_dict[ph] I_ph = phase_current_dict[ph] S_ph = phase_power_dict[ph] # ``I_ph = conj(S/V)`` is already in branch convention # Hence no sign change needed i_branch_init: float = float(np.sqrt(2.0) * np.imag(I_ph)) self.set_if_exists(mdl=mdl, key=ac_voltage_keys[ph], value=float(np.sqrt(2.0) * np.imag(V_ph))) self.set_if_exists(mdl=mdl, key=ac_current_keys[ph], value=i_branch_init) if ac_is_from: self.set_if_exists(mdl=mdl, key=if_keys[ph], value=i_branch_init) self.set_if_exists(mdl=mdl, key=it_keys[ph], value=0.0) self.set_external_param(mdl, d_vf_keys[ph], omega_base * np.sqrt(2.0) * np.real(V_ph)) self.set_external_param(mdl, d_vt_keys[ph], 0.0) else: self.set_if_exists(mdl=mdl, key=if_keys[ph], value=0.0) self.set_if_exists(mdl=mdl, key=it_keys[ph], value=i_branch_init) self.set_external_param(mdl, d_vf_keys[ph], 0.0) self.set_external_param(mdl, d_vt_keys[ph], omega_base * np.sqrt(2.0) * np.real(V_ph)) sf_key = sf_keys[ph] st_key = st_keys[ph] if sf_key is not None: self.set_if_exists(mdl=mdl, key=sf_key, value=float(np.real(S_ph if ac_is_from else 0.0 + 0.0j))) if st_key is not None: self.set_if_exists(mdl=mdl, key=st_key, value=float(np.real(0.0 + 0.0j if ac_is_from else S_ph))) self._set_vsc_pf_positive_sequence( mdl=mdl, VA=complex(VA), VB=complex(VB), VC=complex(VC), IA=IA, IB=IB, IC=IC, ) v_dc = float(np.abs(pf_results.voltage[dc_bus_idx])) self.set_if_exists( mdl=mdl, key=VarPowerFlowReferenceType.Vdc, value=v_dc, persist_after_native_init=True, ) if abs(v_dc) > 1e-12: self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Idc, value=float(np.real(S_dc_total) / v_dc), persist_after_native_init=True) else: self.set_if_exists( mdl=mdl, key=VarPowerFlowReferenceType.Idc, value=0.0, persist_after_native_init=True, ) self._seed_switched_converter_internal_init_balanced( mdl=mdl, VA=complex(VA), VB=complex(VB), VC=complex(VC), IA=IA, IB=IB, IC=IC, v_dc=v_dc, ) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.P, value=float(np.real(S_ac_total))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Q, value=float(np.imag(S_ac_total))) S_from = S_ac_total if ac_is_from else S_dc_total S_to = S_dc_total if ac_is_from else S_ac_total self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pf, value=float(np.real(S_from))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qf, value=float(np.imag(S_from))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pt, value=float(np.real(S_to))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qt, value=float(np.imag(S_to))) def _try_set_dc_branch_pf_init( self, mdl: Block, branch_index: int, f_bus_idx: int, t_bus_idx: int, sbase: float, ) -> None: """ Initialize a two-terminal DC branch from the balanced PF snapshot. The DC EMT branch contract uses explicit terminal voltages ``Vf_dc`` / ``Vt_dc`` and terminal currents ``If_dc`` / ``It_dc`` so that generic branch models such as valves or DC lines can derive their initial state from both connected buses. """ pf_results = self.power_flow_results if pf_results is None: return eps: float = 1.0e-12 preserve_dc_current_seed: bool = ( _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.DcPathModeSeed) is None ) try: v_f: float = float(np.abs(pf_results.voltage[f_bus_idx])) except (AttributeError, IndexError, TypeError, ValueError): v_f = 0.0 try: v_t: float = float(np.abs(pf_results.voltage[t_bus_idx])) except (AttributeError, IndexError, TypeError, ValueError): v_t = 0.0 try: s_f: complex = complex(pf_results.Sf[branch_index]) / sbase except (AttributeError, IndexError, TypeError, ValueError): s_f = 0.0 + 0.0j try: s_t: complex = complex(pf_results.St[branch_index]) / sbase except (AttributeError, IndexError, TypeError, ValueError): s_t = 0.0 + 0.0j i_f: float = float(np.real(s_f) / v_f) if abs(v_f) > eps else 0.0 i_t: float = float(np.real(s_t) / v_t) if abs(v_t) > eps else 0.0 if abs(i_f) > eps: path_mode_seed: float = 1.0 if i_f >= 0.0 else -1.0 else: if (v_f - v_t) > eps: path_mode_seed = 1.0 else: if (v_f - v_t) < -eps: path_mode_seed = -1.0 else: path_mode_seed = 0.0 self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Vf_dc, value=v_f, persist_after_native_init=True) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Vt_dc, value=v_t, persist_after_native_init=True) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.DcPathModeSeed, value=path_mode_seed) self.set_if_exists( mdl=mdl, key=VarPowerFlowReferenceType.If_dc, value=i_f, persist_after_native_init=preserve_dc_current_seed, ) self.set_if_exists( mdl=mdl, key=VarPowerFlowReferenceType.It_dc, value=i_t, persist_after_native_init=preserve_dc_current_seed, ) self.set_if_exists( mdl=mdl, key=VarPowerFlowReferenceType.Idc, value=i_f, persist_after_native_init=preserve_dc_current_seed, ) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pf, value=float(np.real(s_f))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qf, value=float(np.imag(s_f))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pt, value=float(np.real(s_t))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qt, value=float(np.imag(s_t))) def _try_set_branch_pf_init( self, mdl: Block, branch_index: int, f_bus_idx: int, t_bus_idx: int, sbase: float, is_vsc: bool = False, vsc_index: int = -1 ) -> None: """ Initialize branch variables from three-phase power-flow results. The method initializes: - per-phase branch powers Sf_* and St_* when available, - aggregated terminal powers Pf/Qf/Pt/Qt, - per-phase instantaneous terminal currents if_* and it_* at t = 0. The neutral phase is initialized to zero for currents if the corresponding PF quantities are not available. :param mdl: Branch symbolic block. :param branch_index: Branch index in the power-flow results. :param f_bus_idx: From-bus index in the power-flow results. :param t_bus_idx: To-bus index in the power-flow results. :param sbase: Base power of the grid. :return: None """ if self.grid.buses[f_bus_idx].is_dc or self.grid.buses[t_bus_idx].is_dc: self._try_set_dc_branch_pf_init( mdl=mdl, branch_index=branch_index, f_bus_idx=f_bus_idx, t_bus_idx=t_bus_idx, sbase=sbase, ) return phases: List[str] = ["N", "A", "B", "C"] sf_key_dict: Dict[str, Optional[VarPowerFlowReferenceType]] = { "N": None, "A": VarPowerFlowReferenceType.Sf_A, "B": VarPowerFlowReferenceType.Sf_B, "C": VarPowerFlowReferenceType.Sf_C, } st_key_dict: Dict[str, Optional[VarPowerFlowReferenceType]] = { "N": None, "A": VarPowerFlowReferenceType.St_A, "B": VarPowerFlowReferenceType.St_B, "C": VarPowerFlowReferenceType.St_C, } if_key_dict: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.if_N, "A": VarPowerFlowReferenceType.if_A, "B": VarPowerFlowReferenceType.if_B, "C": VarPowerFlowReferenceType.if_C, } it_key_dict: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.it_N, "A": VarPowerFlowReferenceType.it_A, "B": VarPowerFlowReferenceType.it_B, "C": VarPowerFlowReferenceType.it_C, } d_vf_key_dict: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.d_v_N_f, "A": VarPowerFlowReferenceType.d_v_A_f, "B": VarPowerFlowReferenceType.d_v_B_f, "C": VarPowerFlowReferenceType.d_v_C_f, } d_vt_key_dict: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.d_v_N_t, "A": VarPowerFlowReferenceType.d_v_A_t, "B": VarPowerFlowReferenceType.d_v_B_t, "C": VarPowerFlowReferenceType.d_v_C_t, } voltage_from_dict = { "N": self.power_flow_results_3ph.voltage_N, "A": self.power_flow_results_3ph.voltage_A, "B": self.power_flow_results_3ph.voltage_B, "C": self.power_flow_results_3ph.voltage_C, } voltage_to_dict = voltage_from_dict sf_array_dict = { "N": None, "A": self.power_flow_results_3ph.Sf_A, "B": self.power_flow_results_3ph.Sf_B, "C": self.power_flow_results_3ph.Sf_C, } st_array_dict = { "N": None, "A": self.power_flow_results_3ph.St_A, "B": self.power_flow_results_3ph.St_B, "C": self.power_flow_results_3ph.St_C, } sf_sum: complex = 0.0 + 0.0j st_sum: complex = 0.0 + 0.0j from_phase_currents: Dict[str, complex] = dict() to_phase_currents: Dict[str, complex] = dict() from_phase_currents_inst: Dict[str, float] = dict() to_phase_currents_inst: Dict[str, float] = dict() for ph in phases: sf_key: Optional[VarPowerFlowReferenceType] = sf_key_dict[ph] st_key: Optional[VarPowerFlowReferenceType] = st_key_dict[ph] sf_array: Optional[np.ndarray] = sf_array_dict[ph] st_array: Optional[np.ndarray] = st_array_dict[ph] if sf_key is None: pass else: sf_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=sf_key) if sf_var is not None and sf_array is not None: pf_idx = vsc_index if is_vsc and vsc_index >= 0 else branch_index sf_ph_value = _get_array_entry_if_in_range(sf_array, pf_idx) if sf_ph_value is None: pass else: sf_ph = sf_ph_value / sbase self.set_if_exists(mdl=mdl, key=sf_key, value=float(np.real(sf_ph))) sf_sum += sf_ph elif is_vsc: self.set_if_exists(mdl=mdl, key=sf_key, value=0.0) else: pass if st_key is None: pass else: st_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=st_key) if st_var is not None and st_array is not None: pf_idx = vsc_index if is_vsc and vsc_index >= 0 else branch_index st_ph_value = _get_array_entry_if_in_range(st_array, pf_idx) if st_ph_value is None: pass else: st_ph = st_ph_value / sbase self.set_if_exists(mdl=mdl, key=st_key, value=float(np.real(st_ph))) st_sum += st_ph else: pass for ph in phases: if_key: VarPowerFlowReferenceType = if_key_dict[ph] it_key: VarPowerFlowReferenceType = it_key_dict[ph] if_key_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=if_key) it_key_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=it_key) uses_currents: bool = if_key_var is not None or it_key_var is not None if uses_currents: # Missing optional current mappings are skipped exactly like explicit None entries. sf_array = sf_array_dict[ph] st_array = st_array_dict[ph] voltage_from_array = voltage_from_dict[ph] voltage_to_array = voltage_to_dict[ph] pf_idx = vsc_index if is_vsc and vsc_index >= 0 else branch_index if sf_array is None or voltage_from_array is None: i_f = 0.0 + 0.0j else: sf_ph_value = _get_array_entry_if_in_range(sf_array, pf_idx) vf_ph = _get_array_entry_if_in_range(voltage_from_array, f_bus_idx) if sf_ph_value is None or vf_ph is None or abs(vf_ph) <= 1e-12: i_f = 0.0 + 0.0j else: sf_ph = sf_ph_value / sbase i_f = np.conj(sf_ph / vf_ph) if st_array is None or voltage_to_array is None: i_t = 0.0 + 0.0j else: st_ph_value = _get_array_entry_if_in_range(st_array, pf_idx) vt_ph = _get_array_entry_if_in_range(voltage_to_array, t_bus_idx) if st_ph_value is None or vt_ph is None or abs(vt_ph) <= 1e-12: i_t = 0.0 + 0.0j else: st_ph = st_ph_value / sbase i_t = np.conj(st_ph / vt_ph) i_f0: float = np.sqrt(2.0) * np.imag(i_f) i_t0: float = np.sqrt(2.0) * np.imag(i_t) if ph in {"A", "B", "C"}: from_phase_currents[ph] = complex(i_f) to_phase_currents[ph] = complex(i_t) from_phase_currents_inst[ph] = float(i_f0) to_phase_currents_inst[ph] = float(i_t0) else: pass self.set_if_exists(mdl=mdl, key=if_key, value=float(i_f0), persist_after_native_init=False) self.set_if_exists(mdl=mdl, key=it_key, value=float(i_t0), persist_after_native_init=False) else: pass neutral_if_var = _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.if_N) if neutral_if_var is not None: i_f_neutral = -(from_phase_currents.get("A", 0.0 + 0.0j) + from_phase_currents.get("B", 0.0 + 0.0j) + from_phase_currents.get("C", 0.0 + 0.0j)) neutral_value = float(np.sqrt(2.0) * np.imag(i_f_neutral)) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.if_N, value=neutral_value, persist_after_native_init=False) else: pass neutral_it_var = _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.it_N) if neutral_it_var is not None: i_t_neutral = -(to_phase_currents.get("A", 0.0 + 0.0j) + to_phase_currents.get("B", 0.0 + 0.0j) + to_phase_currents.get("C", 0.0 + 0.0j)) neutral_value = float(np.sqrt(2.0) * np.imag(i_t_neutral)) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.it_N, value=neutral_value, persist_after_native_init=False) else: pass if find_name_in_block(f"i_f_A_{mdl.name}", mdl) is not None: self.set_internal_init_if_exists(mdl, f"i_f_A_{mdl.name}", from_phase_currents_inst.get("A", 0.0)) self.set_internal_init_if_exists(mdl, f"i_f_B_{mdl.name}", from_phase_currents_inst.get("B", 0.0)) self.set_internal_init_if_exists(mdl, f"i_f_C_{mdl.name}", from_phase_currents_inst.get("C", 0.0)) self.set_internal_init_if_exists(mdl, f"i_t_A_{mdl.name}", to_phase_currents_inst.get("A", 0.0)) self.set_internal_init_if_exists(mdl, f"i_t_B_{mdl.name}", to_phase_currents_inst.get("B", 0.0)) self.set_internal_init_if_exists(mdl, f"i_t_C_{mdl.name}", to_phase_currents_inst.get("C", 0.0)) else: if find_name_in_block(f"i_ser_A_{mdl.name}", mdl) is not None: self.set_internal_init_if_exists(mdl, f"i_ser_A_{mdl.name}", from_phase_currents_inst.get("A", 0.0)) self.set_internal_init_if_exists(mdl, f"i_ser_B_{mdl.name}", from_phase_currents_inst.get("B", 0.0)) self.set_internal_init_if_exists(mdl, f"i_ser_C_{mdl.name}", from_phase_currents_inst.get("C", 0.0)) else: pass omega_base: float = 2.0 * np.pi * self.grid.fBase for ph in phases: d_vf_key: VarPowerFlowReferenceType = d_vf_key_dict[ph] d_vt_key: VarPowerFlowReferenceType = d_vt_key_dict[ph] voltage_from_array = voltage_from_dict[ph] voltage_to_array = voltage_to_dict[ph] d_vf_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=d_vf_key) if d_vf_var is not None: if voltage_from_array is None: vf_ph = 0.0 + 0.0j else: vf_ph = _get_array_entry_if_in_range(voltage_from_array, f_bus_idx) if vf_ph is None: vf_ph = 0.0 + 0.0j else: pass d_vf: float = omega_base * np.sqrt(2.0) * np.real(vf_ph) self.set_external_param(mdl, d_vf_key, d_vf) else: pass d_vt_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=d_vt_key) if d_vt_var is not None: if voltage_to_array is None: vt_ph = 0.0 + 0.0j else: vt_ph = _get_array_entry_if_in_range(voltage_to_array, t_bus_idx) if vt_ph is None: vt_ph = 0.0 + 0.0j else: pass d_vt: float = omega_base * np.sqrt(2.0) * np.real(vt_ph) self.set_external_param(mdl, d_vt_key, d_vt) else: pass self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pf, value=float(np.real(sf_sum))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qf, value=float(np.imag(sf_sum))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pt, value=float(np.real(st_sum))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qt, value=float(np.imag(st_sum))) def _try_set_branch_pf_init_balanced( self, mdl: Block, branch_index: int, f_bus_idx: int, t_bus_idx: int, sbase: float, is_vsc: bool = False, vsc_index: int = -1 ) -> None: """ Initialize branch variables from balanced power-flow results. The balanced PF provides: - one complex bus voltage phasor per bus in self.power_flow_results.voltage - one total three-phase branch power per end in self.power_flow_results.Sf/St This method reconstructs balanced phase quantities as: - V_A = V - V_B = V * exp(-j*2*pi/3) - V_C = V * exp(+j*2*pi/3) - V_N = 0 and distributes total three-phase branch powers equally among phases: - S_phase = S_3ph / 3 Then: - I_phase = conj(S_phase / V_phase) The method initializes: - per-phase branch powers Sf_* and St_*, - aggregated terminal powers Pf/Qf/Pt/Qt, - per-phase instantaneous terminal currents if_* and it_* at t = 0, - per-phase voltage derivatives d_v_* using the same EMT convention as in the original three-phase initializer. Neutral phase is initialized to zero. """ if is_vsc and vsc_index >= 0: self._try_set_vsc_branch_pf_init_balanced( mdl=mdl, f_bus_idx=f_bus_idx, t_bus_idx=t_bus_idx, sbase=sbase, vsc_index=vsc_index, ) return if self.grid.buses[f_bus_idx].is_dc or self.grid.buses[t_bus_idx].is_dc: self._try_set_dc_branch_pf_init( mdl=mdl, branch_index=branch_index, f_bus_idx=f_bus_idx, t_bus_idx=t_bus_idx, sbase=sbase, ) return phases: List[str] = ["N", "A", "B", "C"] sf_key_dict: Dict[str, Optional[VarPowerFlowReferenceType]] = { "N": None, "A": VarPowerFlowReferenceType.Sf_A, "B": VarPowerFlowReferenceType.Sf_B, "C": VarPowerFlowReferenceType.Sf_C, } st_key_dict: Dict[str, Optional[VarPowerFlowReferenceType]] = { "N": None, "A": VarPowerFlowReferenceType.St_A, "B": VarPowerFlowReferenceType.St_B, "C": VarPowerFlowReferenceType.St_C, } if_key_dict: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.if_N, "A": VarPowerFlowReferenceType.if_A, "B": VarPowerFlowReferenceType.if_B, "C": VarPowerFlowReferenceType.if_C, } it_key_dict: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.it_N, "A": VarPowerFlowReferenceType.it_A, "B": VarPowerFlowReferenceType.it_B, "C": VarPowerFlowReferenceType.it_C, } d_vf_key_dict: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.d_v_N_f, "A": VarPowerFlowReferenceType.d_v_A_f, "B": VarPowerFlowReferenceType.d_v_B_f, "C": VarPowerFlowReferenceType.d_v_C_f, } d_vt_key_dict: Dict[str, VarPowerFlowReferenceType] = { "N": VarPowerFlowReferenceType.d_v_N_t, "A": VarPowerFlowReferenceType.d_v_A_t, "B": VarPowerFlowReferenceType.d_v_B_t, "C": VarPowerFlowReferenceType.d_v_C_t, } pf_results = self.power_flow_results if pf_results is None: return alpha = 2.0 * np.pi / 3.0 v_f_bus: CxVec = pf_results.voltage[f_bus_idx] v_t_bus: CxVec = pf_results.voltage[t_bus_idx] if is_vsc and vsc_index >= 0: pf_results_pfn_vsc: float = float(pf_results.Pfn_vsc[vsc_index]) sf_total = complex( ( pf_results.Pfp_vsc[vsc_index] + pf_results_pfn_vsc ) / sbase, 0.0, ) st_total = pf_results.St_vsc[vsc_index] / sbase voltage_dict: Dict[str, complex] = { "N": 0.0 + 0.0j, "A": 0.0 + 0.0j, "B": 0.0 + 0.0j, "C": 0.0 + 0.0j, } voltage_to_dict: Dict[str, CxVec] = { "N": 0.0 + 0.0j, "A": v_t_bus, "B": v_t_bus * np.exp(-1j * alpha), "C": v_t_bus * np.exp(+1j * alpha), } else: sf_total = pf_results.Sf[branch_index] / sbase st_total = pf_results.St[branch_index] / sbase voltage_dict = { "N": 0.0 + 0.0j, "A": v_f_bus, "B": v_f_bus * np.exp(-1j * alpha), "C": v_f_bus * np.exp(+1j * alpha), } voltage_to_dict = { "N": 0.0 + 0.0j, "A": v_t_bus, "B": v_t_bus * np.exp(-1j * alpha), "C": v_t_bus * np.exp(+1j * alpha), } sf_phase_total = sf_total / 3.0 st_phase_total = st_total / 3.0 sf_sum: complex = 0.0 + 0.0j st_sum: complex = 0.0 + 0.0j from_phase_currents: Dict[str, complex] = dict() to_phase_currents: Dict[str, complex] = dict() for ph in phases: sf_key: Optional[VarPowerFlowReferenceType] = sf_key_dict[ph] st_key: Optional[VarPowerFlowReferenceType] = st_key_dict[ph] if ph == "N": sf_ph = 0.0 + 0.0j st_ph = 0.0 + 0.0j elif is_vsc: sf_ph = 0.0 + 0.0j st_ph = st_phase_total else: sf_ph = sf_phase_total st_ph = st_phase_total if sf_key is not None: sf_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=sf_key) if sf_var is not None: self.set_if_exists(mdl=mdl, key=sf_key, value=float(np.real(sf_ph))) sf_sum += sf_ph else: pass else: pass if st_key is not None: st_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=st_key) if st_var is not None: self.set_if_exists(mdl=mdl, key=st_key, value=float(np.real(st_ph))) st_sum += st_ph else: pass else: pass for ph in phases: if_key: VarPowerFlowReferenceType = if_key_dict[ph] it_key: VarPowerFlowReferenceType = it_key_dict[ph] if_key_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=if_key) it_key_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=it_key) uses_currents: bool = if_key_var is not None or it_key_var is not None if uses_currents: # Missing optional current mappings are skipped exactly like explicit None entries. vf_ph = voltage_dict[ph] vt_ph = voltage_to_dict[ph] if ph == "N": i_f = 0.0 + 0.0j i_t = 0.0 + 0.0j elif is_vsc: i_f = 0.0 + 0.0j if abs(vt_ph) <= 1e-12: i_t = 0.0 + 0.0j else: i_t = np.conj(st_phase_total / vt_ph) else: if abs(vf_ph) <= 1e-12: i_f = 0.0 + 0.0j else: i_f = np.conj(sf_phase_total / vf_ph) if abs(vt_ph) <= 1e-12: i_t = 0.0 + 0.0j else: i_t = np.conj(st_phase_total / vt_ph) i_f0: float = np.sqrt(2.0) * np.imag(i_f) i_t0: float = np.sqrt(2.0) * np.imag(i_t) if ph in {"A", "B", "C"}: from_phase_currents[ph] = complex(i_f) to_phase_currents[ph] = complex(i_t) else: pass self.set_if_exists(mdl=mdl, key=if_key, value=float(i_f0), persist_after_native_init=False) self.set_if_exists(mdl=mdl, key=it_key, value=float(i_t0), persist_after_native_init=False) else: pass neutral_if_var = _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.if_N) if neutral_if_var is not None: i_f_neutral = -(from_phase_currents.get("A", 0.0 + 0.0j) + from_phase_currents.get("B", 0.0 + 0.0j) + from_phase_currents.get("C", 0.0 + 0.0j)) neutral_value = float(np.sqrt(2.0) * np.imag(i_f_neutral)) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.if_N, value=neutral_value, persist_after_native_init=False) else: pass neutral_it_var = _get_external_mapping_var_if_present(mdl=mdl, key=VarPowerFlowReferenceType.it_N) if neutral_it_var is not None: i_t_neutral = -(to_phase_currents.get("A", 0.0 + 0.0j) + to_phase_currents.get("B", 0.0 + 0.0j) + to_phase_currents.get("C", 0.0 + 0.0j)) neutral_value = float(np.sqrt(2.0) * np.imag(i_t_neutral)) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.it_N, value=neutral_value, persist_after_native_init=False) else: pass omega_base: float = 2.0 * np.pi * self.grid.fBase for ph in phases: d_vf_key: VarPowerFlowReferenceType = d_vf_key_dict[ph] d_vt_key: VarPowerFlowReferenceType = d_vt_key_dict[ph] vf_ph = voltage_dict[ph] vt_ph = voltage_to_dict[ph] d_vf_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=d_vf_key) if d_vf_var is not None: d_vf: float = omega_base * np.sqrt(2.0) * np.real(vf_ph) self.set_external_param(mdl, d_vf_key, d_vf) else: pass d_vt_var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=d_vt_key) if d_vt_var is not None: d_vt: float = omega_base * np.sqrt(2.0) * np.real(vt_ph) self.set_external_param(mdl, d_vt_key, d_vt) else: pass if is_vsc and vsc_index >= 0: self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pf, value=float(np.real(sf_total))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qf, value=float(np.imag(sf_total))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pt, value=float(np.real(st_total))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qt, value=float(np.imag(st_total))) else: self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pf, value=float(np.real(sf_sum))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qf, value=float(np.imag(sf_sum))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Pt, value=float(np.real(st_sum))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.Qt, value=float(np.imag(st_sum))) if is_vsc and vsc_index >= 0: if self.power_flow_results is not None: P_vsc = -float(np.real(_get_balanced_vsc_ac_power(self.power_flow_results, vsc_index))) self.set_if_exists(mdl=mdl, key=VarPowerFlowReferenceType.P, value=P_vsc) def _try_set_bergeron_pf_init( self, mdl: Block, rt: BergeronHistoryRuntime, branch_index: int, f_bus_idx: int, t_bus_idx: int, sbase: float ) -> None: """ Initialize Bergeron history terms from three-phase power-flow results. The history sources are initialized so that, at t = 0: i_f(0) = Gc * v_f(0) + Ih_f(0) i_t(0) = Gc * v_t(0) + Ih_t(0) Hence: Ih_f(0) = i_f(0) - Gc * v_f(0) Ih_t(0) = i_t(0) - Gc * v_t(0) Only active phases in the Bergeron runtime are considered. """ voltage_dict: Dict[str, Optional[np.ndarray]] = { "N": self.power_flow_results_3ph.voltage_N, "A": self.power_flow_results_3ph.voltage_A, "B": self.power_flow_results_3ph.voltage_B, "C": self.power_flow_results_3ph.voltage_C, } sf_array_dict: Dict[str, Optional[np.ndarray]] = { "N": None, "A": self.power_flow_results_3ph.Sf_A, "B": self.power_flow_results_3ph.Sf_B, "C": self.power_flow_results_3ph.Sf_C, } st_array_dict: Dict[str, Optional[np.ndarray]] = { "N": None, "A": self.power_flow_results_3ph.St_A, "B": self.power_flow_results_3ph.St_B, "C": self.power_flow_results_3ph.St_C, } v_f0_red = np.zeros(rt.m, dtype=np.float64) v_t0_red = np.zeros(rt.m, dtype=np.float64) i_f0_red = np.zeros(rt.m, dtype=np.float64) i_t0_red = np.zeros(rt.m, dtype=np.float64) for k, ph in enumerate(rt.active_ph): voltage_array = voltage_dict[ph] if voltage_array is None: vf_ph = 0.0 + 0.0j vt_ph = 0.0 + 0.0j else: vf_ph = voltage_array[f_bus_idx] vt_ph = voltage_array[t_bus_idx] # Instantaneous voltage at t = 0 with your EMT convention v_f0_red[k] = float(np.sqrt(2.0) * np.imag(vf_ph)) v_t0_red[k] = float(np.sqrt(2.0) * np.imag(vt_ph)) sf_array = sf_array_dict[ph] st_array = st_array_dict[ph] if sf_array is None or abs(vf_ph) <= 1e-12: i_f = 0.0 + 0.0j else: sf_ph = sf_array[branch_index] / sbase i_f = np.conj(sf_ph / vf_ph) if st_array is None or abs(vt_ph) <= 1e-12: i_t = 0.0 + 0.0j else: st_ph = st_array[branch_index] / sbase i_t = np.conj(st_ph / vt_ph) # Instantaneous current at t = 0 with your EMT convention i_f0_red[k] = float(np.sqrt(2.0) * np.imag(i_f)) i_t0_red[k] = float(np.sqrt(2.0) * np.imag(i_t)) Ih_f0_red = i_f0_red - rt.Gc_red @ v_f0_red Ih_t0_red = i_t0_red - rt.Gc_red @ v_t0_red for k in range(rt.m): # mdl.event_dict[rt.Ih_f[k]] = self.grid.var_factory.add_const(float(Ih_f0_red[k])) # mdl.event_dict[rt.Ih_t[k]] = self.grid.var_factory.add_const(float(Ih_t0_red[k])) mdl.event_dict[rt.Ih_f[k]] = Const(float(Ih_f0_red[k])) mdl.event_dict[rt.Ih_t[k]] = Const(float(Ih_t0_red[k])) rt.initialize_buffers_from_initial_point( v_f0_red=v_f0_red, v_t0_red=v_t0_red, i_f0_red=i_f0_red, i_t0_red=i_t0_red, ) def _try_set_bergeron_pf_init_balanced( self, mdl: Block, rt: BergeronHistoryRuntime, branch_index: int, f_bus_idx: int, t_bus_idx: int, sbase: float ) -> None: """ Initialize Bergeron history terms from balanced power-flow results. Assumptions ----------- - self.power_flow_results.voltage contains one complex bus-voltage phasor per bus (balanced positive-sequence representation). - self.power_flow_results.Sf and self.power_flow_results.St contain the total three-phase complex power at each branch end. - The balanced abc phase voltages are reconstructed from the bus phasor as: V_A = V1 V_B = V1 * exp(-j*2*pi/3) V_C = V1 * exp(+j*2*pi/3) - Per-phase power is: S_phase = S_3ph / 3 - Per-phase current is: I_phase = conj(S_phase / V_phase) EMT initialization convention kept identical to the existing 3-phase version: x(0) = sqrt(2) * imag(X_phasor) Neutral phase, if present, is initialized to zero. """ pf_results = self.power_flow_results alpha = 2.0 * np.pi / 3.0 v_f_bus = pf_results.voltage[f_bus_idx] v_t_bus = pf_results.voltage[t_bus_idx] s_f_3ph = pf_results.Sf[branch_index] / sbase s_t_3ph = pf_results.St[branch_index] / sbase s_f_phase = s_f_3ph / 3.0 s_t_phase = s_t_3ph / 3.0 v_f0_red = np.zeros(rt.m, dtype=np.float64) v_t0_red = np.zeros(rt.m, dtype=np.float64) i_f0_red = np.zeros(rt.m, dtype=np.float64) i_t0_red = np.zeros(rt.m, dtype=np.float64) for k, ph in enumerate(rt.active_ph): if ph == "N": vf_ph = 0.0 + 0.0j vt_ph = 0.0 + 0.0j i_f = 0.0 + 0.0j i_t = 0.0 + 0.0j elif ph == "A": vf_ph = v_f_bus vt_ph = v_t_bus if abs(vf_ph) <= 1e-12: i_f = 0.0 + 0.0j else: i_f = np.conj(s_f_phase / vf_ph) if abs(vt_ph) <= 1e-12: i_t = 0.0 + 0.0j else: i_t = np.conj(s_t_phase / vt_ph) elif ph == "B": vf_ph = v_f_bus * np.exp(-1j * alpha) vt_ph = v_t_bus * np.exp(-1j * alpha) if abs(vf_ph) <= 1e-12: i_f = 0.0 + 0.0j else: i_f = np.conj(s_f_phase / vf_ph) if abs(vt_ph) <= 1e-12: i_t = 0.0 + 0.0j else: i_t = np.conj(s_t_phase / vt_ph) elif ph == "C": vf_ph = v_f_bus * np.exp(+1j * alpha) vt_ph = v_t_bus * np.exp(+1j * alpha) if abs(vf_ph) <= 1e-12: i_f = 0.0 + 0.0j else: i_f = np.conj(s_f_phase / vf_ph) if abs(vt_ph) <= 1e-12: i_t = 0.0 + 0.0j else: i_t = np.conj(s_t_phase / vt_ph) else: raise ValueError(f"Unsupported phase label in Bergeron runtime: {ph}") # Instantaneous values at t = 0 using the same EMT convention as the original code v_f0_red[k] = float(np.sqrt(2.0) * np.imag(vf_ph)) v_t0_red[k] = float(np.sqrt(2.0) * np.imag(vt_ph)) i_f0_red[k] = float(np.sqrt(2.0) * np.imag(i_f)) i_t0_red[k] = float(np.sqrt(2.0) * np.imag(i_t)) Ih_f0_red = i_f0_red - rt.Gc_red @ v_f0_red Ih_t0_red = i_t0_red - rt.Gc_red @ v_t0_red for k in range(rt.m): # mdl.event_dict[rt.Ih_f[k]] = self.grid.var_factory.add_const(float(Ih_f0_red[k])) # mdl.event_dict[rt.Ih_t[k]] = self.grid.var_factory.add_const(float(Ih_t0_red[k])) mdl.event_dict[rt.Ih_f[k]] = Const(float(Ih_f0_red[k])) mdl.event_dict[rt.Ih_t[k]] = Const(float(Ih_t0_red[k])) rt.initialize_buffers_from_initial_point( v_f0_red=v_f0_red, v_t0_red=v_t0_red, i_f0_red=i_f0_red, i_t0_red=i_t0_red, ) def _try_set_jmarti_pf_init( self, mdl: Block, rt: JMartiHistoryRuntime, branch_index: int, f_bus_idx: int, t_bus_idx: int, sbase: float ) -> None: """ Initialize JMARTI history terms from three-phase power-flow results. :param mdl: JMARTI symbolic block. :param rt: JMARTI runtime companion. :param branch_index: Branch index in the power-flow results. :param f_bus_idx: From-side bus index. :param t_bus_idx: To-side bus index. :param sbase: System base power in MVA. :return: None. """ voltage_dict: Dict[str, Optional[np.ndarray]] = { "N": self.power_flow_results_3ph.voltage_N, "A": self.power_flow_results_3ph.voltage_A, "B": self.power_flow_results_3ph.voltage_B, "C": self.power_flow_results_3ph.voltage_C, } sf_array_dict: Dict[str, Optional[np.ndarray]] = { "N": None, "A": self.power_flow_results_3ph.Sf_A, "B": self.power_flow_results_3ph.Sf_B, "C": self.power_flow_results_3ph.Sf_C, } st_array_dict: Dict[str, Optional[np.ndarray]] = { "N": None, "A": self.power_flow_results_3ph.St_A, "B": self.power_flow_results_3ph.St_B, "C": self.power_flow_results_3ph.St_C, } v_f0_phasor_red: np.ndarray = np.zeros(rt.m, dtype=np.complex128) v_t0_phasor_red: np.ndarray = np.zeros(rt.m, dtype=np.complex128) i_f0_phasor_red: np.ndarray = np.zeros(rt.m, dtype=np.complex128) i_t0_phasor_red: np.ndarray = np.zeros(rt.m, dtype=np.complex128) phase_index: int = 0 ph: str voltage_array: Optional[np.ndarray] sf_array: Optional[np.ndarray] st_array: Optional[np.ndarray] vf_ph: complex vt_ph: complex i_f: complex i_t: complex while phase_index < rt.m: ph = rt.active_ph[phase_index] voltage_array = voltage_dict[ph] if voltage_array is None: vf_ph = 0.0 + 0.0j vt_ph = 0.0 + 0.0j else: vf_ph = complex(voltage_array[f_bus_idx]) vt_ph = complex(voltage_array[t_bus_idx]) v_f0_phasor_red[phase_index] = vf_ph v_t0_phasor_red[phase_index] = vt_ph sf_array = sf_array_dict[ph] st_array = st_array_dict[ph] if sf_array is None or abs(vf_ph) <= 1.0e-12: i_f = 0.0 + 0.0j else: i_f = np.conj(complex(sf_array[branch_index]) / sbase / vf_ph) if st_array is None or abs(vt_ph) <= 1.0e-12: i_t = 0.0 + 0.0j else: i_t = np.conj(complex(st_array[branch_index]) / sbase / vt_ph) i_f0_phasor_red[phase_index] = i_f i_t0_phasor_red[phase_index] = i_t phase_index += 1 ih_f_phase, ih_t_phase = rt.initialize_from_fundamental_phasors( v_f0_phasor_red=v_f0_phasor_red, v_t0_phasor_red=v_t0_phasor_red, i_f0_phasor_red=i_f0_phasor_red, i_t0_phasor_red=i_t0_phasor_red, system_frequency_hz=float(self.grid.fBase), ) phase_index = 0 while phase_index < rt.m: mdl.event_dict[rt.Ih_f[phase_index]] = Const(float(np.real(ih_f_phase[phase_index]))) mdl.event_dict[rt.Ih_t[phase_index]] = Const(float(np.real(ih_t_phase[phase_index]))) phase_index += 1 def _try_set_jmarti_pf_init_balanced( self, mdl: Block, rt: JMartiHistoryRuntime, branch_index: int, f_bus_idx: int, t_bus_idx: int, sbase: float ) -> None: """ Initialize JMARTI history terms from balanced power-flow results. :param mdl: JMARTI symbolic block. :param rt: JMARTI runtime companion. :param branch_index: Branch index in the power-flow results. :param f_bus_idx: From-side bus index. :param t_bus_idx: To-side bus index. :param sbase: System base power in MVA. :return: None. """ pf_results = self.power_flow_results alpha: float = 2.0 * np.pi / 3.0 v_f_bus: complex = complex(pf_results.voltage[f_bus_idx]) v_t_bus: complex = complex(pf_results.voltage[t_bus_idx]) s_f_3ph: complex = complex(pf_results.Sf[branch_index]) / sbase s_t_3ph: complex = complex(pf_results.St[branch_index]) / sbase s_f_phase: complex = s_f_3ph / 3.0 s_t_phase: complex = s_t_3ph / 3.0 v_f0_phasor_red: np.ndarray = np.zeros(rt.m, dtype=np.complex128) v_t0_phasor_red: np.ndarray = np.zeros(rt.m, dtype=np.complex128) i_f0_phasor_red: np.ndarray = np.zeros(rt.m, dtype=np.complex128) i_t0_phasor_red: np.ndarray = np.zeros(rt.m, dtype=np.complex128) phase_index: int = 0 ph: str vf_ph: complex vt_ph: complex i_f: complex i_t: complex while phase_index < rt.m: ph = rt.active_ph[phase_index] if ph == "N": vf_ph = 0.0 + 0.0j vt_ph = 0.0 + 0.0j i_f = 0.0 + 0.0j i_t = 0.0 + 0.0j else: if ph == "A": vf_ph = v_f_bus vt_ph = v_t_bus else: if ph == "B": vf_ph = v_f_bus * np.exp(-1j * alpha) vt_ph = v_t_bus * np.exp(-1j * alpha) else: if ph == "C": vf_ph = v_f_bus * np.exp(+1j * alpha) vt_ph = v_t_bus * np.exp(+1j * alpha) else: raise ValueError(f"Unsupported phase label in JMARTI runtime: {ph}") if abs(vf_ph) <= 1.0e-12: i_f = 0.0 + 0.0j else: i_f = np.conj(s_f_phase / vf_ph) if abs(vt_ph) <= 1.0e-12: i_t = 0.0 + 0.0j else: i_t = np.conj(s_t_phase / vt_ph) v_f0_phasor_red[phase_index] = vf_ph v_t0_phasor_red[phase_index] = vt_ph i_f0_phasor_red[phase_index] = i_f i_t0_phasor_red[phase_index] = i_t phase_index += 1 ih_f_phase, ih_t_phase = rt.initialize_from_fundamental_phasors( v_f0_phasor_red=v_f0_phasor_red, v_t0_phasor_red=v_t0_phasor_red, i_f0_phasor_red=i_f0_phasor_red, i_t0_phasor_red=i_t0_phasor_red, system_frequency_hz=float(self.grid.fBase), ) phase_index = 0 while phase_index < rt.m: mdl.event_dict[rt.Ih_f[phase_index]] = self.grid.var_factory.add_const(float(np.real(ih_f_phase[phase_index]))) mdl.event_dict[rt.Ih_t[phase_index]] = self.grid.var_factory.add_const(float(np.real(ih_t_phase[phase_index]))) phase_index += 1 # --------------------------------------------------------------------- # UTILS # ---------------------------------------------------------------------
[docs] def set_if_exists(self, mdl: Block, key: VarPowerFlowReferenceType, value: float, persist_after_native_init: bool = False) -> None: """ Set an initialization value only if the external mapping contains the key. :param mdl: Model block containing the external mapping. :param key: External mapping key. :param value: Initialization value. :param persist_after_native_init: Preserve the mapped guess after native initialization. :return: None """ external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) owner_block: Optional[Block] if external_mapping is None: pass else: mapped_var: Optional[Var] = external_mapping.get(key, None) if mapped_var is None: pass else: # GUI wrapper roots may expose one PF reference whose mutable # runtime storage actually belongs to a nested child block. # The assignment must therefore target the block that owns the # ``event_dict`` entry instead of assuming root ownership. owner_block = _find_event_var_owner(mdl, mapped_var) if owner_block is None: value_float: float = float(value) self._temp_init_guess[mapped_var.uid] = value_float if persist_after_native_init: self._temp_post_init_guess[mapped_var.uid] = value_float else: pass else: owner_block.event_dict[mapped_var] = Const(float(value)) if persist_after_native_init: self._temp_post_init_guess[mapped_var.uid] = float(value) else: pass
[docs] def set_internal_init_if_exists(self, mdl: Block, var_name: str, value: float) -> None: """ Set one temporary initial guess for an internal variable if it exists. :param mdl: Root model block. :param var_name: Internal variable name. :param value: Initialization value. :return: None. """ internal_var: Optional[Var] = _find_internal_var_for_init(mdl, var_name) if internal_var is None: pass else: self._temp_init_guess[internal_var.uid] = float(value) self._temp_post_init_guess[internal_var.uid] = float(value)
[docs] def set_internal_mode_if_exists(self, mdl: Block, var_name: str, value: float) -> None: """ Set one retained runtime mode default if the mode variable exists. :param mdl: Root model block. :param var_name: Retained mode variable name. :param value: Runtime mode value. :return: None. """ internal_var: Optional[Var] owner_block: Optional[Block] internal_var, owner_block = _find_mode_var_owner(mdl, var_name) if internal_var is None or owner_block is None: pass else: if internal_var in owner_block.mode_dict: owner_block.mode_dict[internal_var] = Const(float(value)) if "_uid2idx_event_params" in self.__dict__ and "_event_params_values" in self.__dict__ and "_event_parameters_eqs" in self.__dict__: runtime_idx: int | None = self._uid2idx_event_params.get(internal_var.uid, None) if runtime_idx is None: pass else: self._event_params_values[runtime_idx] = float(value) self._event_parameters_eqs[runtime_idx] = Const(float(value)) if "_runtime_all_eqs_source" in self.__dict__: self._runtime_all_eqs_source[runtime_idx] = Const(float(value)) else: pass if "_runtime_parameter_eqs0" in self.__dict__: self._runtime_parameter_eqs0[runtime_idx] = Const(float(value)) else: pass else: pass else: pass
[docs] def set_internal_runtime_if_exists(self, mdl: Block, var_name: str, value: float) -> None: """ Set one internal runtime parameter default if the variable exists in ``event_dict``. :param mdl: Root model block. :param var_name: Runtime parameter variable name. :param value: Runtime parameter value. :return: None. """ internal_var: Optional[Var] owner_block: Optional[Block] runtime_idx: int | None internal_var, owner_block = _find_internal_runtime_var_owner(mdl, var_name) if internal_var is None or owner_block is None: pass else: if internal_var in owner_block.event_dict: owner_block.event_dict[internal_var] = Const(float(value)) if "_uid2idx_event_params" in self.__dict__ and "_event_params_values" in self.__dict__ and "_event_parameters_eqs" in self.__dict__: runtime_idx = self._uid2idx_event_params.get(internal_var.uid, None) if runtime_idx is None: pass else: self._event_params_values[runtime_idx] = float(value) self._event_parameters_eqs[runtime_idx] = Const(float(value)) if "_runtime_all_eqs_source" in self.__dict__: self._runtime_all_eqs_source[runtime_idx] = Const(float(value)) else: pass if "_runtime_parameter_eqs0" in self.__dict__: self._runtime_parameter_eqs0[runtime_idx] = Const(float(value)) else: pass else: pass else: pass
[docs] def set_internal_diff_init_if_exists(self, mdl: Block, var_name: str, value: float) -> None: """ Set one temporary differential initial guess for an internal variable if it exists. :param mdl: Root model block. :param var_name: Differential variable name. :param value: Initialization value. :return: None. """ internal_var: Optional[Var] = _find_internal_var_for_init(mdl, var_name) if internal_var is None: pass else: self._temp_diff_init_guess[internal_var.uid] = float(value) self._temp_post_diff_init_guess[internal_var.uid] = float(value)
def _seed_switched_converter_internal_init_balanced( self, mdl: Block, VA: complex, VB: complex, VC: complex, IA: complex, IB: complex, IC: complex, v_dc: float, ) -> None: """ Seed the internal variables of the switched converter from balanced PF data. The switched converter contains additional bridge variables that are not part of the external mapping. Seeding them from the PF operating point avoids a poor first Newton guess where all bridge voltages start at zero. :param mdl: Root converter block. :param VA: Phase-A bus voltage phasor. :param VB: Phase-B bus voltage phasor. :param VC: Phase-C bus voltage phasor. :param IA: Phase-A current phasor. :param IB: Phase-B current phasor. :param IC: Phase-C current phasor. :param v_dc: DC-bus magnitude in p.u. :return: None. """ model_name: str = str(mdl.name) a_operator: complex = np.exp(1j * 2.0 * np.pi / 3.0) V1: complex = (VA + a_operator * VB + (a_operator * a_operator) * VC) / 3.0 phi_v: float = float(np.angle(V1)) shift: float = 2.0 * np.pi / 3.0 theta_b: float = phi_v - shift theta_c: float = phi_v + shift c13: float = 1.0 / 3.0 c23: float = 2.0 / 3.0 omega_base: float = 2.0 * np.pi * self.grid.fBase i_A0: float = float(np.sqrt(2.0) * np.imag(IA)) i_B0: float = float(np.sqrt(2.0) * np.imag(IB)) i_C0: float = float(np.sqrt(2.0) * np.imag(IC)) i_d0: float = c23 * (np.sin(phi_v) * i_A0 + np.sin(theta_b) * i_B0 + np.sin(theta_c) * i_C0) i_q0: float = -c23 * (np.cos(phi_v) * i_A0 + np.cos(theta_b) * i_B0 + np.cos(theta_c) * i_C0) i_00: float = c13 * (i_A0 + i_B0 + i_C0) Vpk: float = float(np.sqrt(2.0) * np.abs(V1)) R_eq: float = _get_internal_runtime_default(mdl, f"R_eq_{model_name}", 0.02) L_eq: float = _get_internal_runtime_default(mdl, f"L_eq_{model_name}", 0.08) m_max: float = _get_internal_runtime_default(mdl, f"m_max_{model_name}", 0.95) vdc_floor: float = _get_internal_runtime_default(mdl, f"vdc_floor_{model_name}", 0.05) sbase0: float = _get_internal_runtime_default(mdl, f"sbase_{model_name}", self.grid.Sbase) control1_code: float = _get_internal_runtime_default(mdl, f"control1_{model_name}", 0.0) control2_code: float = _get_internal_runtime_default(mdl, f"control2_{model_name}", 0.0) control1_val0: float = _get_internal_runtime_default(mdl, f"control1_val_{model_name}", 0.0) control2_val0: float = _get_internal_runtime_default(mdl, f"control2_val_{model_name}", 0.0) i_kp0: float = _get_internal_runtime_default(mdl, f"i_kp_{model_name}", 0.0) vdc_kp0: float = _get_internal_runtime_default(mdl, f"vdc_kp_{model_name}", 0.0) q_kp0: float = _get_internal_runtime_default(mdl, f"q_kp_{model_name}", 0.0) eps: float = 1.0e-10 v_dc_eff: float = max(float(v_dc), vdc_floor) P_meas0_pu: float = float(np.real(VA * np.conj(IA) + VB * np.conj(IB) + VC * np.conj(IC))) Q_meas0_pu: float = float(np.imag(VA * np.conj(IA) + VB * np.conj(IB) + VC * np.conj(IC))) P_meas0: float = P_meas0_pu * sbase0 i_dc0_conv: float = -P_meas0_pu / max(v_dc_eff, eps) configured_p0_value: float = _get_internal_runtime_default(mdl, f"P0_{model_name}", P_meas0) P_ref0: float Q_ref0: float Vdc_ref0: float self.set_internal_runtime_if_exists(mdl, f"P0_{model_name}", configured_p0_value) P_ref0, Q_ref0, Vdc_ref0 = _resolve_converter_control_reference_values( control1_code=control1_code, control2_code=control2_code, control1_val=control1_val0, control2_val=control2_val0, p0_value=configured_p0_value, ) P_ref_pu0: float = P_ref0 / max(sbase0, eps) Q_ref_pu0: float = Q_ref0 / max(sbase0, eps) i_d_ref0: float = (2.0 / 3.0) * P_ref_pu0 / max(Vpk, eps) i_q_ref0: float = (2.0 / 3.0) * Q_ref_pu0 / max(Vpk, eps) i_0_ref0: float = 0.0 P_f0: float = P_meas0_pu Q_f0: float = Q_meas0_pu v_cmd_d0: float = Vpk - R_eq * i_d0 + L_eq * i_q0 v_cmd_q0: float = -R_eq * i_q0 - L_eq * i_d0 v_cmd_00: float = -R_eq * i_00 k_v_conv0: float = float(np.sqrt(v_cmd_d0 * v_cmd_d0 + v_cmd_q0 * v_cmd_q0 + eps) / max(m_max * max(Vdc_ref0, vdc_floor), eps)) v_ref_a0: float = v_cmd_d0 * np.sin(phi_v) - v_cmd_q0 * np.cos(phi_v) + v_cmd_00 v_ref_b0: float = v_cmd_d0 * np.sin(theta_b) - v_cmd_q0 * np.cos(theta_b) + v_cmd_00 v_ref_c0: float = v_cmd_d0 * np.sin(theta_c) - v_cmd_q0 * np.cos(theta_c) + v_cmd_00 v_mod_scale0: float = max(k_v_conv0 * v_dc_eff, eps) m_a_u0: float = v_ref_a0 / v_mod_scale0 m_b_u0: float = v_ref_b0 / v_mod_scale0 m_c_u0: float = v_ref_c0 / v_mod_scale0 m_a0: float = float(np.clip(m_a_u0, -m_max, m_max)) m_b0: float = float(np.clip(m_b_u0, -m_max, m_max)) m_c0: float = float(np.clip(m_c_u0, -m_max, m_max)) desired_v_abc: np.ndarray = np.array([v_ref_a0, v_ref_b0, v_ref_c0], dtype=float) best_error: Optional[float] = None best_state: np.ndarray = np.zeros(3, dtype=float) state_index_a: int = 0 state_index_b: int state_index_c: int # The bridge starts from the closest discrete switching state to the desired phase-voltage vector. while state_index_a < 2: state_index_b = 0 while state_index_b < 2: state_index_c = 0 while state_index_c < 2: candidate_gate: np.ndarray = np.array([float(state_index_a), float(state_index_b), float(state_index_c)], dtype=float) candidate_leg: np.ndarray = (2.0 * candidate_gate - 1.0) * 0.5 * v_dc_eff candidate_common_mode: float = float(np.sum(candidate_leg) / 3.0) candidate_conv: np.ndarray = candidate_leg - candidate_common_mode candidate_error: float = float(np.sum((candidate_conv - desired_v_abc) ** 2)) if best_error is None or candidate_error < best_error: best_error = candidate_error best_state = candidate_gate.copy() else: pass state_index_c += 1 state_index_b += 1 state_index_a += 1 gate_a0 = float(best_state[0]) gate_b0 = float(best_state[1]) gate_c0 = float(best_state[2]) v_leg_a0: float = (2.0 * gate_a0 - 1.0) * 0.5 * v_dc_eff v_leg_b0: float = (2.0 * gate_b0 - 1.0) * 0.5 * v_dc_eff v_leg_c0: float = (2.0 * gate_c0 - 1.0) * 0.5 * v_dc_eff v_common_mode0: float = (v_leg_a0 + v_leg_b0 + v_leg_c0) / 3.0 v_conv_a0: float = v_leg_a0 - v_common_mode0 v_conv_b0: float = v_leg_b0 - v_common_mode0 v_conv_c0: float = v_leg_c0 - v_common_mode0 v_conv_d0: float = c23 * (np.sin(phi_v) * v_conv_a0 + np.sin(theta_b) * v_conv_b0 + np.sin(theta_c) * v_conv_c0) v_conv_q0: float = -c23 * (np.cos(phi_v) * v_conv_a0 + np.cos(theta_b) * v_conv_b0 + np.cos(theta_c) * v_conv_c0) v_conv_00: float = c13 * (v_conv_a0 + v_conv_b0 + v_conv_c0) switching_enabled0: float = _get_internal_mode_default(mdl, f"switching_enabled_mode_{model_name}", 0.0) v_conv_a_applied0: float = (1.0 - switching_enabled0) * v_ref_a0 + switching_enabled0 * v_conv_a0 v_conv_b_applied0: float = (1.0 - switching_enabled0) * v_ref_b0 + switching_enabled0 * v_conv_b0 v_conv_c_applied0: float = (1.0 - switching_enabled0) * v_ref_c0 + switching_enabled0 * v_conv_c0 v_conv_d_applied0: float = (1.0 - switching_enabled0) * v_cmd_d0 + switching_enabled0 * v_conv_d0 v_conv_q_applied0: float = (1.0 - switching_enabled0) * v_cmd_q0 + switching_enabled0 * v_conv_q0 v_conv_0_applied0: float = (1.0 - switching_enabled0) * v_cmd_00 + switching_enabled0 * v_conv_00 v_A0: float = float(np.sqrt(2.0) * np.imag(VA)) v_B0: float = float(np.sqrt(2.0) * np.imag(VB)) v_C0: float = float(np.sqrt(2.0) * np.imag(VC)) i_A0_plant: float = i_A0 i_B0_plant: float = i_B0 i_C0_plant: float = i_C0 i_d0_plant: float = i_d0 i_q0_plant: float = i_q0 i_00_plant: float = i_00 P_loss00: float = _get_internal_runtime_default(mdl, f"P_loss0_{model_name}", 0.0) k_v_conv_nom0: float = Vpk / max(m_max * max(Vdc_ref0, vdc_floor), eps) i_d_ff0: float = (2.0 / 3.0) * ((P_ref0 + P_loss00) / max(sbase0, eps)) / max(Vpk, eps) i_q_ff0: float = (2.0 / 3.0) * (Q_ref0 / max(sbase0, eps)) / max(Vpk, eps) xi_vdc0: float = i_d_ref0 - i_d_ff0 - vdc_kp0 * (Vdc_ref0 - v_dc) xi_q0: float = i_q_ref0 - i_q_ff0 - q_kp0 * ((Q_ref0 / max(sbase0, eps)) - (Q_f0 / max(sbase0, eps))) v_pi_d_u0: float = (Vpk - R_eq * i_d0_plant + L_eq * i_q0_plant) - v_cmd_d0 v_pi_q_u0: float = (0.0 - R_eq * i_q0_plant - L_eq * i_d0_plant) - v_cmd_q0 v_pi_00_u0: float = (0.0 - R_eq * i_00_plant) - v_cmd_00 xi_id0: float = v_pi_d_u0 - i_kp0 * (i_d_ref0 - i_d0_plant) xi_iq0: float = v_pi_q_u0 - i_kp0 * (i_q_ref0 - i_q0_plant) xi_i00: float = v_pi_00_u0 - i_kp0 * (i_0_ref0 - i_00_plant) v_lim0: float = k_v_conv0 * m_max * v_dc_eff d_i_d0: float = omega_base * (Vpk - v_conv_d_applied0 - R_eq * i_d0 + L_eq * i_q0) / max(L_eq, eps) d_i_q0: float = omega_base * (0.0 - v_conv_q_applied0 - R_eq * i_q0 - L_eq * i_d0) / max(L_eq, eps) d_i_00: float = omega_base * (0.0 - v_conv_0_applied0 - R_eq * i_00) / max(L_eq, eps) d_i_A0_plant: float = omega_base * (v_conv_a_applied0 - v_A0 - R_eq * i_A0_plant) / max(L_eq, eps) d_i_B0_plant: float = omega_base * (v_conv_b_applied0 - v_B0 - R_eq * i_B0_plant) / max(L_eq, eps) d_i_C0_plant: float = omega_base * (v_conv_c_applied0 - v_C0 - R_eq * i_C0_plant) / max(L_eq, eps) d_xi_id0: float = _get_internal_runtime_default(mdl, f"i_ki_{model_name}", 0.0) * ((i_d_ref0 - i_d0_plant) + _get_internal_runtime_default(mdl, f"aw_gain_{model_name}", 0.0) * (v_cmd_d0 - v_cmd_d0)) d_xi_iq0: float = _get_internal_runtime_default(mdl, f"i_ki_{model_name}", 0.0) * ((i_q_ref0 - i_q0_plant) + _get_internal_runtime_default(mdl, f"aw_gain_{model_name}", 0.0) * (v_cmd_q0 - v_cmd_q0)) d_xi_i00: float = _get_internal_runtime_default(mdl, f"i_ki_{model_name}", 0.0) * ((i_0_ref0 - i_00_plant) + _get_internal_runtime_default(mdl, f"aw_gain_{model_name}", 0.0) * (v_cmd_00 - v_cmd_00)) # The switched bridge and its carrier logic need a coherent first operating point before the first accepted EMT step. self.set_internal_init_if_exists(mdl, f"v_dc_{model_name}", v_dc) self.set_internal_init_if_exists(mdl, f"i_dc_{model_name}", i_dc0_conv) self.set_internal_init_if_exists(mdl, f"i_dc_conv_{model_name}", i_dc0_conv) self.set_internal_init_if_exists(mdl, f"P_ref_{model_name}", P_ref0) self.set_internal_init_if_exists(mdl, f"Q_ref_{model_name}", Q_ref0) self.set_internal_init_if_exists(mdl, f"Vdc_ref_{model_name}", Vdc_ref0) self.set_internal_init_if_exists(mdl, f"P_{model_name}", P_meas0_pu) self.set_internal_init_if_exists(mdl, f"Q_{model_name}", Q_meas0_pu) self.set_internal_init_if_exists(mdl, f"P_f_{model_name}", P_f0) self.set_internal_init_if_exists(mdl, f"Q_f_{model_name}", Q_f0) self.set_internal_init_if_exists(mdl, f"i_d_ref_{model_name}", i_d_ref0) self.set_internal_init_if_exists(mdl, f"i_q_ref_{model_name}", i_q_ref0) self.set_internal_init_if_exists(mdl, f"i_0_ref_{model_name}", i_0_ref0) self.set_internal_init_if_exists(mdl, f"i_d_ref_u_{model_name}", i_d_ref0) self.set_internal_init_if_exists(mdl, f"i_q_ref_u_{model_name}", i_q_ref0) self.set_internal_init_if_exists(mdl, f"i_0_ref_u_{model_name}", i_0_ref0) self.set_internal_init_if_exists(mdl, f"i_d_ff_{model_name}", i_d_ff0) self.set_internal_init_if_exists(mdl, f"i_q_ff_{model_name}", i_q_ff0) self.set_internal_init_if_exists(mdl, f"v_mag_{model_name}", Vpk) self.set_internal_init_if_exists(mdl, f"k_v_conv_nom_{model_name}", k_v_conv_nom0) self.set_internal_init_if_exists(mdl, f"xi_vdc_{model_name}", xi_vdc0) self.set_internal_init_if_exists(mdl, f"xi_q_{model_name}", xi_q0) self.set_internal_init_if_exists(mdl, f"v_pi_d_u_{model_name}", v_pi_d_u0) self.set_internal_init_if_exists(mdl, f"v_pi_q_u_{model_name}", v_pi_q_u0) self.set_internal_init_if_exists(mdl, f"v_pi_0_u_{model_name}", v_pi_00_u0) self.set_internal_init_if_exists(mdl, f"v_cmd_d_u_{model_name}", v_cmd_d0) self.set_internal_init_if_exists(mdl, f"v_cmd_q_u_{model_name}", v_cmd_q0) self.set_internal_init_if_exists(mdl, f"v_cmd_0_u_{model_name}", v_cmd_00) self.set_internal_init_if_exists(mdl, f"v_lim_{model_name}", v_lim0) self.set_internal_init_if_exists(mdl, f"i_d_{model_name}", i_d0) self.set_internal_init_if_exists(mdl, f"i_q_{model_name}", i_q0) self.set_internal_init_if_exists(mdl, f"i_0_{model_name}", i_00) self.set_internal_init_if_exists(mdl, f"v_d_{model_name}", Vpk) self.set_internal_init_if_exists(mdl, f"v_q_{model_name}", 0.0) self.set_internal_init_if_exists(mdl, f"v_0_{model_name}", 0.0) self.set_internal_init_if_exists(mdl, f"v_cmd_d_{model_name}", v_cmd_d0) self.set_internal_init_if_exists(mdl, f"v_cmd_q_{model_name}", v_cmd_q0) self.set_internal_init_if_exists(mdl, f"v_cmd_0_{model_name}", v_cmd_00) self.set_internal_init_if_exists(mdl, f"k_v_conv_{model_name}", k_v_conv0) self.set_internal_init_if_exists(mdl, f"v_ref_a_{model_name}", v_ref_a0) self.set_internal_init_if_exists(mdl, f"v_ref_b_{model_name}", v_ref_b0) self.set_internal_init_if_exists(mdl, f"v_ref_c_{model_name}", v_ref_c0) self.set_internal_init_if_exists(mdl, f"m_a_u_{model_name}", m_a_u0) self.set_internal_init_if_exists(mdl, f"m_b_u_{model_name}", m_b_u0) self.set_internal_init_if_exists(mdl, f"m_c_u_{model_name}", m_c_u0) self.set_internal_init_if_exists(mdl, f"m_a_{model_name}", m_a0) self.set_internal_init_if_exists(mdl, f"m_b_{model_name}", m_b0) self.set_internal_init_if_exists(mdl, f"m_c_{model_name}", m_c0) self.set_internal_init_if_exists(mdl, f"i_A_{model_name}", i_A0_plant) self.set_internal_init_if_exists(mdl, f"i_B_{model_name}", i_B0_plant) self.set_internal_init_if_exists(mdl, f"i_C_{model_name}", i_C0_plant) self.set_internal_init_if_exists(mdl, f"gate_a_{model_name}", gate_a0) self.set_internal_init_if_exists(mdl, f"gate_b_{model_name}", gate_b0) self.set_internal_init_if_exists(mdl, f"gate_c_{model_name}", gate_c0) self.set_internal_init_if_exists(mdl, f"v_leg_a_{model_name}", v_leg_a0) self.set_internal_init_if_exists(mdl, f"v_leg_b_{model_name}", v_leg_b0) self.set_internal_init_if_exists(mdl, f"v_leg_c_{model_name}", v_leg_c0) self.set_internal_init_if_exists(mdl, f"v_common_mode_{model_name}", v_common_mode0) self.set_internal_init_if_exists(mdl, f"v_conv_a_{model_name}", v_conv_a_applied0) self.set_internal_init_if_exists(mdl, f"v_conv_b_{model_name}", v_conv_b_applied0) self.set_internal_init_if_exists(mdl, f"v_conv_c_{model_name}", v_conv_c_applied0) self.set_internal_init_if_exists(mdl, f"v_conv_d_{model_name}", v_conv_d_applied0) self.set_internal_init_if_exists(mdl, f"v_conv_q_{model_name}", v_conv_q_applied0) self.set_internal_init_if_exists(mdl, f"v_conv_0_{model_name}", v_conv_0_applied0) self.set_internal_init_if_exists(mdl, f"xi_id_{model_name}", xi_id0) self.set_internal_init_if_exists(mdl, f"xi_iq_{model_name}", xi_iq0) self.set_internal_init_if_exists(mdl, f"xi_i0_{model_name}", xi_i00) self.set_internal_mode_if_exists(mdl, f"switching_enabled_mode_{model_name}", switching_enabled0) self.set_internal_mode_if_exists(mdl, f"gate_a_mode_{model_name}", gate_a0) self.set_internal_mode_if_exists(mdl, f"gate_b_mode_{model_name}", gate_b0) self.set_internal_mode_if_exists(mdl, f"gate_c_mode_{model_name}", gate_c0) self.set_internal_diff_init_if_exists(mdl, f"d_i_d_{model_name}", d_i_d0) self.set_internal_diff_init_if_exists(mdl, f"d_i_q_{model_name}", d_i_q0) self.set_internal_diff_init_if_exists(mdl, f"d_i_0_{model_name}", d_i_00) self.set_internal_diff_init_if_exists(mdl, f"d_theta_pll_{model_name}", omega_base) # The incremental switched-converter path uses a nested ``plant`` and ``plant_bridge`` hierarchy. self.set_internal_init_if_exists(mdl, f"i_A_{model_name}_plant", i_A0_plant) self.set_internal_init_if_exists(mdl, f"i_B_{model_name}_plant", i_B0_plant) self.set_internal_init_if_exists(mdl, f"i_C_{model_name}_plant", i_C0_plant) self.set_internal_init_if_exists(mdl, f"i_d_{model_name}_plant", i_d0_plant) self.set_internal_init_if_exists(mdl, f"i_q_{model_name}_plant", i_q0_plant) self.set_internal_init_if_exists(mdl, f"i_0_{model_name}_plant", i_00_plant) self.set_internal_init_if_exists(mdl, f"v_d_{model_name}_plant", Vpk) self.set_internal_init_if_exists(mdl, f"v_q_{model_name}_plant", 0.0) self.set_internal_init_if_exists(mdl, f"v_0_{model_name}_plant", 0.0) self.set_internal_init_if_exists(mdl, f"theta_pll_{model_name}", phi_v) self.set_internal_init_if_exists(mdl, f"omega_pll_{model_name}", omega_base) self.set_internal_init_if_exists(mdl, f"v_cmd_d_{model_name}", v_cmd_d0) self.set_internal_init_if_exists(mdl, f"v_cmd_q_{model_name}", v_cmd_q0) self.set_internal_init_if_exists(mdl, f"v_cmd_0_{model_name}", v_cmd_00) self.set_internal_diff_init_if_exists(mdl, f"d_i_A_{model_name}_plant", d_i_A0_plant) self.set_internal_diff_init_if_exists(mdl, f"d_i_B_{model_name}_plant", d_i_B0_plant) self.set_internal_diff_init_if_exists(mdl, f"d_i_C_{model_name}_plant", d_i_C0_plant) self.set_internal_diff_init_if_exists(mdl, f"d_xi_id_{model_name}", d_xi_id0) self.set_internal_diff_init_if_exists(mdl, f"d_xi_iq_{model_name}", d_xi_iq0) self.set_internal_diff_init_if_exists(mdl, f"d_xi_i0_{model_name}", d_xi_i00) self.set_internal_init_if_exists(mdl, f"m_a_{model_name}_plant_bridge", m_a0) self.set_internal_init_if_exists(mdl, f"m_b_{model_name}_plant_bridge", m_b0) self.set_internal_init_if_exists(mdl, f"m_c_{model_name}_plant_bridge", m_c0) self.set_internal_init_if_exists(mdl, f"m_a_u_{model_name}_plant_bridge", m_a_u0) self.set_internal_init_if_exists(mdl, f"m_b_u_{model_name}_plant_bridge", m_b_u0) self.set_internal_init_if_exists(mdl, f"m_c_u_{model_name}_plant_bridge", m_c_u0) self.set_internal_init_if_exists(mdl, f"v_ref_a_{model_name}_plant_bridge", v_ref_a0) self.set_internal_init_if_exists(mdl, f"v_ref_b_{model_name}_plant_bridge", v_ref_b0) self.set_internal_init_if_exists(mdl, f"v_ref_c_{model_name}_plant_bridge", v_ref_c0) self.set_internal_init_if_exists(mdl, f"gate_a_{model_name}_plant_bridge", gate_a0) self.set_internal_init_if_exists(mdl, f"gate_b_{model_name}_plant_bridge", gate_b0) self.set_internal_init_if_exists(mdl, f"gate_c_{model_name}_plant_bridge", gate_c0) self.set_internal_init_if_exists(mdl, f"v_leg_a_{model_name}_plant_bridge", v_leg_a0) self.set_internal_init_if_exists(mdl, f"v_leg_b_{model_name}_plant_bridge", v_leg_b0) self.set_internal_init_if_exists(mdl, f"v_leg_c_{model_name}_plant_bridge", v_leg_c0) self.set_internal_init_if_exists(mdl, f"v_common_mode_{model_name}_plant_bridge", v_common_mode0) self.set_internal_init_if_exists(mdl, f"v_conv_a_{model_name}_plant_bridge", v_conv_a0) self.set_internal_init_if_exists(mdl, f"v_conv_b_{model_name}_plant_bridge", v_conv_b0) self.set_internal_init_if_exists(mdl, f"v_conv_c_{model_name}_plant_bridge", v_conv_c0) self.set_internal_mode_if_exists(mdl, f"gate_a_mode_{model_name}_plant_bridge", gate_a0) self.set_internal_mode_if_exists(mdl, f"gate_b_mode_{model_name}_plant_bridge", gate_b0) self.set_internal_mode_if_exists(mdl, f"gate_c_mode_{model_name}_plant_bridge", gate_c0) def _seed_all_switched_vsc_models(self) -> None: """ Seed all switched VSC EMT models explicitly from PF results. The generic branch initialization path is sufficient for the averaged VSC, but the switched converter contains many additional internal bridge/filter variables. This explicit pass guarantees that all switched VSC instances receive their PF-consistent seeds after the native EMT initialization. :return: None. """ bus_idx_dict: Dict[Any, int] = self.grid.get_bus_index_dict() vsc_index: int = 0 vsc: Any f_bus_idx: int t_bus_idx: int while vsc_index < len(self.grid.vsc_devices): vsc = self.grid.vsc_devices[vsc_index] f_bus_idx = bus_idx_dict[vsc.bus_from] t_bus_idx = bus_idx_dict[vsc.bus_to] if self.power_flow_results_3ph is not None: self._try_set_vsc_branch_pf_init( mdl=self._working_emt_models.get(str(vsc.idtag), vsc.emt_model), f_bus_idx=f_bus_idx, t_bus_idx=t_bus_idx, sbase=self.grid.Sbase, vsc_index=vsc_index, ) else: self._try_set_vsc_branch_pf_init_balanced( mdl= self._working_emt_models.get(str(vsc.idtag), vsc.emt_model), f_bus_idx=f_bus_idx, t_bus_idx=t_bus_idx, sbase=self.grid.Sbase, vsc_index=vsc_index, ) vsc_index += 1 def _seed_all_switch_models(self) -> None: """ Seed all EMT switch models from the static switch active state when requested. :return: None. """ branch_obj: Any for branch_obj in self.grid.get_branches_iter(add_vsc=True, add_hvdc=True, add_switch=True): if branch_obj.device_type == DeviceType.SwitchDevice: mdl = self._working_emt_models.get(str(branch_obj.idtag), branch_obj.emt_model) if mdl is None: pass else: model_name: str = _get_block_name(mdl) seed_from_pf: float = _get_internal_runtime_default( mdl, f"switch_seed_from_pf_{model_name}", 1.0, ) if seed_from_pf > 0.5: initial_closed: float = 1.0 if bool(branch_obj.active) else 0.0 else: initial_closed = _get_internal_mode_default( mdl, f"switch_closed_mode_{model_name}", 1.0, ) self.set_internal_mode_if_exists(mdl, f"switch_closed_mode_{model_name}", initial_closed) else: pass def _initialize_mode_event_state(self) -> None: """ Initialize the mode event cursor state. :return: None """ for uid, event_list in self._scheduled_mode_events.items(): event_list.sort(key=_get_mode_event_sort_key) self._mode_event_cursor[uid] = 0 def _apply_scheduled_mode_events(self, t_curr: float, full_params: np.ndarray) -> None: """ Apply scheduled retained mode events to the flat full parameter vector. :param t_curr: Current simulation time. :param full_params: Flat full parameter vector. :return: None """ for uid, event_list in self._scheduled_mode_events.items(): event_idx: int = self._mode_event_cursor.get(uid, 0) while event_idx < len(event_list) and t_curr >= event_list[event_idx][0]: event_time: float = event_list[event_idx][0] event_value: float = event_list[event_idx][1] force_step_alignment: bool = event_list[event_idx][2] runtime_idx: Optional[int] = self.uid2idx_event_params.get(uid, None) is_time_aligned: bool = _is_time_aligned(t_curr, event_time) if force_step_alignment: if is_time_aligned: if runtime_idx is not None: full_params[runtime_idx] = event_value else: pass else: raise RuntimeError( f"Scheduled EMT mode event at t={event_time} requires exact step alignment, " f"but current solver time is t={t_curr}. Adjust the EMT time step or enable step splitting." ) else: if runtime_idx is not None: full_params[runtime_idx] = event_value else: pass event_idx += 1 self._mode_event_cursor[uid] = event_idx def _collect_runtime_mode_parameters(self) -> List[Var]: """ Collect runtime parameters that must be retained between steps. This includes: - scheduled discrete mode parameters, - Bergeron line history parameters, which are updated explicitly by the runtime. """ mode_parameters: List[Var] = list() seen: Set[int] = set() # 1) Scheduled mode parameters already registered through mode_dict for parameter in self.get_runtime_mode_parameters(): if parameter.uid not in seen: seen.add(parameter.uid) mode_parameters.append(parameter) # 2) Bergeron history parameters must also be retained for rt in self.history_models: for parameter in rt.Ih_f: if parameter.uid not in seen: seen.add(parameter.uid) mode_parameters.append(parameter) for parameter in rt.Ih_t: if parameter.uid not in seen: seen.add(parameter.uid) mode_parameters.append(parameter) return mode_parameters
[docs] def add_device_var(self, dev: ALL_DEV_TYPES, var: Var)->None: """ Registers a variable belonging to a specific device. """ var_list = self._vars_info.get(dev, None) if var_list is None: self._vars_info[dev] = [var] else: var_list.append(var)
[docs] def set_init_guess(self, mdl: Block, reference_powerflow: Any, val: float) -> None: """ Set the temporary initial guess for a mapped variable during the parsing phase. :param mdl: Model block containing the external mapping. :param reference_powerflow: Reference key used to locate the mapped variable. :param val: Initialization value. :return: None """ external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) if external_mapping is None: pass else: var: Optional[Var] = external_mapping.get(reference_powerflow, None) if var is None: pass else: self._temp_init_guess[var.uid] = float(val)
[docs] def set_init_guess_and_preserve_post_init(self, mdl: Block, reference_powerflow: Any, val: float) -> None: """ Set one mapped initialization guess and preserve it after native initialization. :param mdl: Model block containing the external mapping. :param reference_powerflow: Reference key used to locate the mapped variable. :param val: Initialization value. :return: None. """ external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) if external_mapping is None: pass else: var: Optional[Var] = external_mapping.get(reference_powerflow, None) if var is None: pass else: value_float: float = float(val) self._temp_init_guess[var.uid] = value_float self._temp_post_init_guess[var.uid] = value_float
[docs] def set_diff_init_guess(self, mdl: Block, reference_powerflow: Any, val: float) -> None: """ Set the temporary initial guess for a mapped differential variable during the parsing phase. :param mdl: Model block containing the external mapping. :param reference_powerflow: Reference key used to locate the mapped differential variable. :param val: Initialization value. :return: None """ external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl) if external_mapping is None: pass else: diff_var: Optional[Var] = external_mapping.get(reference_powerflow, None) if diff_var is None: pass else: self._temp_diff_init_guess[diff_var.uid] = float(val)
[docs] def set_external_param(self, mdl: Block, key: VarPowerFlowReferenceType, value: float) -> None: """ Set a PF-derived value either as an event parameter if the mapped variable belongs to mdl.event_dict. """ var: Optional[Any] = _get_external_mapping_var_if_present(mdl=mdl, key=key) owner_block: Optional[Block] if var is None: return else: pass if isinstance(var, Var): # GUI wrapper roots often expose the PF reference while the actual # mutable runtime parameter lives in a child ``event_dict``. The # value must therefore be written to the owning child block. owner_block = _find_event_var_owner(mdl, var) if owner_block is None: pass else: owner_block.event_dict[var] = Const(float(value)) else: pass
[docs] def get_init_guess_info(self) -> pd.DataFrame: """ Return a table with the explicitly initialized variable guesses. :return: DataFrame with UID, variable name and initialization value. """ rows: List[Dict[str, Any]] = list() all_vars: Dict[int, Var] = dict() for var in self._state_vars: all_vars[var.uid] = var for var in self._algebraic_vars: all_vars[var.uid] = var for uid, value in self.init_guess.items(): if uid in all_vars: rows.append({ "uid": uid, "var_name": all_vars[uid].name, "value": value, }) else: pass return pd.DataFrame(rows)
[docs] def get_device_vars_dict(self) -> Dict[ALL_DEV_TYPES, List[Var]]: """ Returns the dictionary mapping electrical devices to their variables. """ return self._vars_info
@property def vars_glob_name2uid(self) -> Dict[str, int]: """ Returns the dictionary mapping global variable names to UIDs. """ return self._vars_glob_name2uid @property def boundary_update(self) -> EmtBoundaryUpdateProtocol | None: """ Return the endogenous EMT boundary updater consumed by the EMT solvers. """ if len(self._scheduled_mode_events) == 0 and len(self.history_models) == 0 and self._block_boundary_updater is None: return None else: pass return self
[docs] def reset_boundary_update_state(self, t0: float = 0.0) -> None: """ Reset the EMT boundary update state before a new solver run starts. :param t0: Initial simulation time. :return: None """ super().reset_boundary_update_state(t0) self.step_counter = 0 self._mode_event_cursor = dict() self._initialize_mode_event_state() from VeraGridEngine.Utils.procedural_logic import build_boundary_updater_from_block self._block_boundary_updater = build_boundary_updater_from_block(self)
[docs] def emt_boundary_update( self, t_curr: float, x_prev: np.ndarray, full_params: np.ndarray ) -> None: """ Update runtime boundaries before the Newton step. Retained mode events are applied first. Afterwards, history-dependent EMT models update their internal boundary data. :param t_curr: Current simulation time. :param x_prev: Previous accepted state vector. :param full_params: Flat full parameter vector. :return: None """ self._apply_scheduled_mode_events(t_curr, full_params) if self._block_boundary_updater is not None: # The retained procedural logic reacts after scheduled events so it sees the latest gate commands. self._block_boundary_updater.update(t_curr, x_prev, full_params) else: pass for rt in self.history_models: rt.update_history(self.step_counter, x_prev, full_params) # Persist the updated runtime parameters so the next step sees them n_runtime: int = self.get_variable_parameter_number() self._event_params_values[:] = full_params[:n_runtime] self.step_counter += 1
[docs] def get_next_forced_event_time(self, t_prev: float, t_target: float) -> Optional[float]: """ Return the earliest forced-alignment event time in the interval (t_prev, t_target]. :param t_prev: Previous solver time. :param t_target: Nominal target time. :return: Earliest forced event time or None. """ scheduled_time: Optional[float] procedural_time: Optional[float] scheduled_time = _get_next_forced_mode_event_time(self._scheduled_mode_events, t_prev, t_target) if self._block_boundary_updater is None: procedural_time = None else: procedural_time = self._block_boundary_updater.get_next_forced_event_time(t_prev, t_target) # Merge the scheduled and procedural forced-event candidates so the # solver advances only to the earliest boundary that requires explicit # alignment on the next integration step. if scheduled_time is None: return procedural_time else: pass if procedural_time is None: return scheduled_time else: pass if scheduled_time <= procedural_time: return scheduled_time else: return procedural_time
[docs] def update(self, t: float, x: np.ndarray, params: np.ndarray) -> None: """ Boundary update entry point compatible with BoundaryUpdateWrapper. :param t: Current simulation time. :param x: Previous accepted state vector. :param params: Flat full parameter vector. :return: None """ self.emt_boundary_update(t, x, params)
[docs] def get_floquet_ak_stack( self, trajectory: np.ndarray, h: float, jac_evaluator: Optional[Any] = None, static_params: Optional[np.ndarray] = None ) -> Optional[np.ndarray]: """ Return the stack of reduced transition matrices used for Floquet analysis. :param trajectory: State trajectory over one period. :param h: Time step. :param jac_evaluator: Optional Jacobian evaluator. :param static_params: Optional static parameter vector. :return: Stack of reduced transition matrices or None. """ if jac_evaluator is None: self.logger.add_warning( msg="No Jacobian evaluator was provided for Floquet Ak stack computation.", device="EmtProblemDae", value="None", expected_value="Callable Jacobian evaluator", device_class="EMT", device_property="jac_evaluator" ) return None else: pass n = self.get_states_number() steps = len(trajectory) - 1 if steps <= 0: return None stack = np.zeros((steps, n, n), order='C') for i in range(steps): x_k = trajectory[i] J = jac_evaluator(x_k, static_params, x_k, None, h, None) J11 = J[:n, :n] J12 = J[:n, n:] J21 = J[n:, :n] J22 = J[n:, n:] J22_inv_J21 = spla.spsolve(J22, J21) J_red = (J11 - J12 @ J22_inv_J21) if sp.issparse(J_red): J_red = J_red.toarray() stack[i] = la.inv(h * J_red) return stack
def _get_or_create_working_emt_model(self, dev: Any) -> Block: """ Return the EMT block used by this EmtProblemDae instance. This block is never the grid-owned dev.emt_model object. The copy must preserve Var.uid values so grid EMT events that target original Vars still match by uid. """ dev_key: str = str(dev.idtag) mdl: Block | None = self._working_emt_models.get(dev_key, None) if mdl is None: mdl = copy.deepcopy(dev.emt_model) if mdl.children: # Script-built EMT workflows call ``set_emt_model()`` which flattens the # connected hierarchy through ``unify_blocks()`` before the model ever reaches # the solver. GUI-loaded .veragrid files can preserve the pre-unified wrapper # roots instead. The EMT assembly expects the same normalized block shape in # both workflows, otherwise wrapper-only metadata can diverge from the actual # child equations and produce a structurally singular system. mdl.unify_blocks() # The unified placeholder rebuilds the block-level collections from the child # hierarchy. Re-deriving the external mapping afterwards ensures that EMT # initialization seeds the live flattened variables rather than stale wrapper # aliases that may no longer participate directly in the residual equations. mdl.external_mapping = _collect_external_mapping_from_block(mdl) mdl.api_obj_mapping = _collect_api_obj_mapping_from_block(mdl) else: pass self._working_emt_models[dev_key] = mdl else: pass return mdl
[docs] def get_init_guess_table(self, include_all_vars: bool = True, only_in_init_guess: bool = False, tol_zero: float = 1e-12) -> pd.DataFrame: """ Return a table with uid, alias/name and initial value for each variable. include_all_vars: - True: include all state+algebraic vars, even if not in init_guess (value=0). - False: include only uids present in init_guess (or only_in_init_guess=True). only_in_init_guess: - True: filter to only those explicitly initialized (uid in init_guess). - False: keep everything (depending on include_all_vars). tol_zero: threshold to flag values close to zero. """ # Build x0 and dx0 from current guesses x0 = self.get_x0() dx0 = self.get_dx0() uid2idx_vars = self.uid2idx_vars uid2idx_diff = self.uid2idx_diff # Define the sets we want to process: (Variables, Guess_Source, Index_Map, Vector, Label) data_sets = [ (list(self.get_state_vars()) + list(self.get_algebraic_vars()), self.init_guess, uid2idx_vars, x0, "Var"), (list(self.get_diff_vars()), self.diff_init_guess, uid2idx_diff, dx0, "DiffVar") ] rows: List[Dict[str, Any]] = list() for vars_list, guess_dict, idx_map, vector, var_type in data_sets: for v in vars_list: uid = v.uid idx = idx_map.get(uid, None) val = float(vector[idx]) if idx is not None else 0.0 in_guess = uid in guess_dict keep_row = True if only_in_init_guess and (not in_guess): keep_row = False if (not include_all_vars) and (not in_guess): keep_row = False if keep_row: alias: Optional[str] = self._alias_names_dict.get(uid, None) rows.append({ "uid": uid, "type": var_type, "idx": idx, # add this "alias": alias, "name": v.name, "value": val, "in_init_guess": in_guess, "is_zero": abs(val) <= tol_zero, }) else: pass df = pd.DataFrame(rows) # Nice ordering: Group by type, then initialization status if not df.empty: df = df.sort_values(["type", "idx"], ascending=[False, True]) return df
def _assign_generator_sharing_targets_from_seed( self, inj: Any, mdl: Block, phase_power: np.ndarray, generator_count_same_bus: int, static_parameter_values_mapping: Dict[Var, Const], ) -> None: """ Assign optional sharing references to one generator from its EMT seed. The Thevenin generator template accepts total active and reactive power references. These are derived from the already computed per-device EMT seed so the dynamic controller starts from the same sharing target used during the PF-to-EMT initialization. :param inj: Generator device. :param phase_power: Complex power seed in NABC order. :param generator_count_same_bus: Number of generators connected to the same bus. :return: None. """ p_ref_value: float = float(np.real(np.sum(phase_power))) q_ref_value: float = float(np.imag(np.sum(phase_power))) if self.options.verbose: print(f"p_ref_value = {p_ref_value} generator {inj}") print(f"q_ref_value = {q_ref_value} generator {inj}") if generator_count_same_bus > 1: share_enable_value: float = 1.0 else: share_enable_value = 0.0 self._assign_api_mapping_value_if_present( mdl=mdl, key=ParamPowerFlowReferenceType.generator_share_enable, value=share_enable_value, static_parameter_values_mapping=static_parameter_values_mapping, ) self._assign_api_mapping_value_if_present( mdl=mdl, key=ParamPowerFlowReferenceType.generator_share_p_ref, value=p_ref_value, static_parameter_values_mapping=static_parameter_values_mapping, ) self._assign_event_mapping_value_if_present( mdl=mdl, key=ParamPowerFlowReferenceType.generator_share_p_ref, value=p_ref_value, ) self._assign_api_mapping_value_if_present( mdl=mdl, key=ParamPowerFlowReferenceType.generator_share_q_ref, value=q_ref_value, static_parameter_values_mapping=static_parameter_values_mapping, ) self._assign_event_mapping_value_if_present( mdl=mdl, key=ParamPowerFlowReferenceType.generator_share_q_ref, value=q_ref_value, )