Source code for VeraGridEngine.Simulations.Rms.problems.rms_problem_dae

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from typing import Dict, List, Callable, Any, Tuple, Set, Optional
import time
import numpy as np
import pandas as pd
import scipy.sparse as sp

from VeraGridEngine import Logger
from VeraGridEngine.enumerations import ParamPowerFlowReferenceType, DeviceType, DynamicEventTransitionType
from VeraGridEngine.Devices import MultiCircuit
from VeraGridEngine.Simulations.driver_template import DummySignal
from VeraGridEngine.Utils.Symbolic.symbolic import (Var, Const, Expr, piecewise, get_expression_vars, heaviside,
                                                    hard_sat)
from VeraGridEngine.Utils.Symbolic.compiled_functions import SymbolicParamsVector, SymbolicDerivative, SymbolicJacobian
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.Utils.Symbolic.symbolic_io import block_deep_copy
from VeraGridEngine.enumerations import VarPowerFlowReferenceType, RmsInitializationMethod
from VeraGridEngine.basic_structures import Vec, ObjVec, BoolVec, Logger
from VeraGridEngine.Simulations.PowerFlow.power_flow_driver import PowerFlowResults
from VeraGridEngine.Simulations.Rms.rms_options import RmsOptions
from VeraGridEngine.Utils.Symbolic.explicit_initialization_symbolic import (init_explicit_common,
                                                                            build_rms_single_equation_compiler)
from VeraGridEngine.Simulations.Rms.initialization import init_pseudo_transient
from VeraGridEngine.Simulations.Rms.problems.rms_problem_template import RmsProblemTemplate
from VeraGridEngine.Devices.types import ALL_DEV_TYPES
from VeraGridEngine.Devices.Substation.bus import Bus
from VeraGridEngine.Devices.Events.rms_events_group import RmsEventsGroup
from VeraGridEngine.Devices.Events.rms_event import RmsEvent
from VeraGridEngine.Devices.Branches.transformer import Transformer2W
from VeraGridEngine.Utils.Symbolic.jit_compiler import RMSCompiler
from VeraGridEngine.Utils.Symbolic.bus_rms_template import get_bus_rms_algebraic_vars
from VeraGridEngine.Utils.procedural_logic import build_boundary_updater_from_block
from VeraGridEngine.IO.fmu.importer.experimental_cs import (
    advance_rms_fmu_cs_devices,
    align_rms_fmu_cs_device_output_parameters,
    close_rms_fmu_cs_devices,
    initialize_rms_fmu_cs_devices,
    register_rms_fmu_cs_device,
)
from VeraGridEngine.IO.fmu.importer.experimental_me import (
    advance_rms_fmu_me_devices,
    close_rms_fmu_me_devices,
    initialize_rms_fmu_me_devices,
    register_rms_fmu_me_device,
)

from VeraGridEngine.Utils.Symbolic.static_parameter_mapping_rms import (
    assign_static_api_object_mapping_for_device,
)

from VeraGridEngine.Utils.procedural_logic import BlockProceduralLogicUpdater


[docs] def assign_line_static_parameters(elm: Any, parameter_reference: ParamPowerFlowReferenceType) -> Const: if parameter_reference == ParamPowerFlowReferenceType.g: return Const(float(elm.R / (elm.R ** 2 + elm.X ** 2))) if parameter_reference == ParamPowerFlowReferenceType.b: return Const(float(-elm.X / (elm.R ** 2 + elm.X ** 2))) if parameter_reference == ParamPowerFlowReferenceType.bsh: return Const(elm.B) if parameter_reference == ParamPowerFlowReferenceType.r: return Const(elm.R) if parameter_reference == ParamPowerFlowReferenceType.l: return Const(elm.X) else: raise ValueError("parameter reference expression missing")
def _tic(): return time.perf_counter() def _toc(t0): return time.perf_counter() - t0 def _is_time_aligned(t_curr: float, event_time: float) -> bool: """ Return whether ``t_curr`` is aligned with ``event_time`` within numeric tolerance. """ machine_eps: float = float(np.finfo(np.float64).eps) time_tol: float = 10.0 * machine_eps * max(1.0, abs(event_time)) return bool(abs(t_curr - event_time) <= time_tol) def _get_mode_event_sort_key(event: Tuple[float, float, bool]) -> float: """ Return the sorting key of one mode event. :param event: Mode event tuple ``(time, value, force_step_alignment)``. :return: Event time. """ return event[0] def _get_next_forced_mode_event_time( scheduled_mode_events: Dict[int, List[Tuple[float, float, bool]]], t_prev: float, t_target: float, ) -> Optional[float]: """ Return the earliest forced-alignment mode event time in ``(t_prev, t_target]``. :param scheduled_mode_events: Mapping uid -> sorted list of (time, value, force_step_alignment). :param t_prev: Previous local solver time. :param t_target: Nominal macro target time. :return: Earliest forced event time or ``None``. """ next_time: Optional[float] = None for _, event_list in scheduled_mode_events.items(): for event_time, _, force_step_alignment in event_list: if force_step_alignment and (t_prev < event_time <= t_target): if next_time is None or event_time < next_time: next_time = event_time return next_time def _is_rms_time_strictly_after_event(t_curr: float, event_time: float) -> bool: """ Return whether the current RMS time is strictly after one event time. The RMS reference trajectories treat the event sample itself as still belonging to the pre-event branch. Floating-point time accumulation can make a nominal sample such as ``0.1`` appear as ``0.10000000000000007``. Using the same alignment tolerance already employed elsewhere in the solver keeps boundary decisions stable and prevents one-sample-early event activation. :param t_curr: Current solver time. :param event_time: Event activation time. :return: ``True`` only when the current time is beyond the aligned boundary. """ if _is_time_aligned(t_curr=t_curr, event_time=event_time): return False else: return t_curr > event_time def _build_ramp_runtime_expr(time_var: Var, start_time: float, end_time: float, before_expr: Expr | Const, final_value: float) -> Expr: """ Build one linear ramp transition on top of an existing runtime expression. The expression keeps the previous runtime behavior before ``start_time``, evolves linearly toward ``final_value`` until ``end_time``, and finally holds the final value after the ramp completes. :param time_var: Global simulation time variable. :param start_time: Ramp start time. :param end_time: Ramp end time. :param before_expr: Expression active before the ramp starts. :param final_value: Final runtime value after the ramp ends. :return: Runtime expression including the ramp segment. """ start_expr: Expr | Const = _freeze_runtime_expr_at_time(expr=before_expr, time_var=time_var, sample_time=start_time) duration_expr: Const = Const(float(end_time - start_time)) time_offset_expr: Expr = time_var - Const(float(start_time)) progress_expr: Expr = hard_sat(time_offset_expr / duration_expr, Const(0.0), Const(1.0)) # The RMS symbolic backend defines ``heaviside(0)`` as zero. A gate that is # multiplied directly by ``heaviside(t - start_time)`` therefore keeps the # pre-event branch active exactly at the ramp start and can make the first # post-event samples look like an immediate step after implicit integration. # Clamping the normalized progress and blending between the frozen pre-event # value and the final target removes that boundary ambiguity while preserving # the intended linear ramp on the open interval and the final hold after the # ramp end. return start_expr + progress_expr * (Const(float(final_value)) - start_expr) def _get_rms_event_time_sort_key(evt: RmsEvent) -> tuple[float, float]: """ Build the chronological sort key for one RMS event. :param evt: RMS event. :return: Tuple containing start and end times. Multi-event runtime replay must apply events in deterministic time order. Sorting by start time first and end time second keeps the replay stable for any combination of step and ramp events. """ end_time_value: float if evt.end_time is not None: end_time_value = float(evt.end_time) else: end_time_value = float(evt.time) return float(evt.time), end_time_value def _evaluate_rms_runtime_parameter_value_from_events(base_value: float, event_list: List[RmsEvent], t_curr: float) -> float: """ Evaluate one RMS runtime parameter by replaying its events procedurally. :param base_value: Baseline parameter value before any event happens. :param event_list: Ordered RMS events affecting the parameter. :param t_curr: Current simulation time. :return: Runtime parameter value. The symbolic nested ``piecewise`` composition is fragile when several events act on the same continuous parameter. Procedural replay is linear in the number of events and naturally supports any number and combination of step and ramp transitions. """ current_value: float = float(base_value) event_item: RmsEvent for event_item in event_list: if event_item.transition_type == DynamicEventTransitionType.Step: if _is_rms_time_strictly_after_event(t_curr=t_curr, event_time=float(event_item.time)): current_value = float(event_item.value) else: pass else: if event_item.transition_type == DynamicEventTransitionType.Ramp: start_time: float = float(event_item.time) end_time: float | None = event_item.end_time ramp_start_value: float = float(current_value) if end_time is not None: resolved_end_time: float = float(end_time) if resolved_end_time > start_time: if _is_rms_time_strictly_after_event(t_curr=t_curr, event_time=start_time): if t_curr >= resolved_end_time: current_value = float(event_item.value) else: ramp_progress: float = (float(t_curr) - start_time) / (resolved_end_time - start_time) if ramp_progress < 0.0: ramp_progress = 0.0 else: if ramp_progress > 1.0: ramp_progress = 1.0 else: pass current_value = ramp_start_value + ramp_progress * (float(event_item.value) - ramp_start_value) else: pass else: if _is_rms_time_strictly_after_event(t_curr=t_curr, event_time=start_time): current_value = float(event_item.value) else: pass else: if _is_rms_time_strictly_after_event(t_curr=t_curr, event_time=start_time): current_value = float(event_item.value) else: pass else: if _is_rms_time_strictly_after_event(t_curr=t_curr, event_time=float(event_item.time)): current_value = float(event_item.value) else: pass return current_value def _build_runtime_expression_from_single_rms_event(time_var: Var, base_expression: Expr | Const, event_item: RmsEvent) -> Expr | Const: """ Build the legacy symbolic runtime expression for one RMS event. :param time_var: Global simulation time variable. :param base_expression: Baseline runtime expression before the event. :param event_item: Single RMS event affecting the parameter. :return: Runtime expression including the event. The historical RMS references were generated with a symbolic event path for single continuous events. Preserving that exact path keeps the legacy CSV tests stable while the procedural replay path is reserved for true multi-event combinations that are fragile under nested symbolic composition. """ transition_tpe: DynamicEventTransitionType = event_item.transition_type if transition_tpe == DynamicEventTransitionType.Ramp: end_time_raw: float | None = event_item.end_time if end_time_raw is not None: end_time: float = float(end_time_raw) start_time: float = float(event_item.time) if end_time > start_time: return _build_ramp_runtime_expr(time_var=time_var, start_time=start_time, end_time=end_time, before_expr=base_expression, final_value=float(event_item.value)) else: return piecewise(time_var=time_var, t_events=np.asarray([float(event_item.time)], dtype=np.float64), new_values=np.asarray([float(event_item.value)], dtype=np.float64), default_value=base_expression) else: return piecewise(time_var=time_var, t_events=np.asarray([float(event_item.time)], dtype=np.float64), new_values=np.asarray([float(event_item.value)], dtype=np.float64), default_value=base_expression) else: return piecewise(time_var=time_var, t_events=np.asarray([float(event_item.time)], dtype=np.float64), new_values=np.asarray([float(event_item.value)], dtype=np.float64), default_value=base_expression) def _is_rms_event_routed_to_scheduled_mode( transition_tpe: DynamicEventTransitionType, parameter_uid: int, discrete_parameter_uids: Set[int], continuous_parameter_uids: Set[int], ) -> bool: """ Decide whether one RMS event must use the scheduled step path. The RMS runtime parameter layer must support two behaviors for the same symbolic parameter. Step events must keep the historical scheduled update path used by existing reference trajectories, while ramp events must stay on the continuous symbolic path so the runtime parameter evolves smoothly over time. This helper therefore routes each event from its transition profile first, and only falls back to the registered parameter class when the event profile does not force a unique path. :param transition_tpe: Normalized event transition type. :param parameter_uid: Runtime parameter unique identifier. :param discrete_parameter_uids: Registered discrete/mode parameter UIDs. :param continuous_parameter_uids: Registered continuous parameter UIDs. :return: ``True`` when the event must use scheduled step handling. """ if transition_tpe == DynamicEventTransitionType.Step: return True else: if transition_tpe == DynamicEventTransitionType.Ramp: return False else: if parameter_uid in discrete_parameter_uids: return True else: if parameter_uid in continuous_parameter_uids: return False else: return True def _freeze_runtime_expr_at_time(expr: Expr | Const, time_var: Var, sample_time: float) -> Expr | Const: """ Freeze one runtime expression at a specific time sample. The ramp interpolation must start from the pre-event runtime value evaluated exactly at the ramp start. Replacing the symbolic time variable by the start time preserves that baseline even when the pre-event expression also depends on time. :param expr: Runtime expression to freeze. :param time_var: Global simulation time variable. :param sample_time: Time sample used to freeze the expression. :return: Expression evaluated at the supplied time sample. """ if isinstance(expr, Const): return expr else: time_binding: Dict[Var, Expr] = dict({time_var: Const(float(sample_time))}) return expr.subs(time_binding)
[docs] def setP(P: ObjVec, P_used: BoolVec, k: int, val: object): """ :param P: :param P_used: :param k: :param val: :return: """ if not P_used[k]: P[k] = val P_used[k] = True else: P[k] += val
[docs] def setQ(Q: ObjVec, Q_used: BoolVec, k: int, val: object): """ :param Q: :param Q_used: :param k: :param val: :return: """ if not Q_used[k]: Q[k] = val Q_used[k] = True else: Q[k] += val
[docs] class RmsProblemDae(RmsProblemTemplate): """ DAE (Differential-Algebraic Equation) class to store and manage. Responsibilities: - Store state and algebraic variables (x, y) - Store Jacobian matrices - Store residual equations - Store sparsity patterns """ VARS_NAME = "vrs" VARIABLE_PARAMS_NAME = "vprms" CONSTANT_PARAMS_NAME = "cprms" DIFF_NAME = "diff" TIME_NAME = "glob_time" def __init__(self, grid: MultiCircuit, options: RmsOptions, pf_results: PowerFlowResults, progress_signal: DummySignal | None = None, progress_text: DummySignal | None = None, ): """ :param grid: :param options: :param pf_results: """ super().__init__(progress_signal=progress_signal, progress_text=progress_text) self.logger = Logger() self.grid: MultiCircuit = grid self.power_flow_results: PowerFlowResults = pf_results self.Sf = self.power_flow_results.Sf / self.grid.Sbase self.St = self.power_flow_results.St / self.grid.Sbase self.options: RmsOptions = options # this is the general init guess that will contain all the variables init value self.init_guess: Dict[int, float | int | complex | None] = dict() self.event_params_init_dict: Dict[int, float | int | complex | None] = dict() self.sys_block: Block = Block(children=[], in_vars=[]) # when vectorizing this will be a list of lists self._algebraic_vars: List[Var] = list() self._algebraic_eqs: List[Expr] = list() # when vectorizing this will be a list of lists self._state_vars: List[Var] = list() self._state_eqs: List[Expr] = list() self._diff_vars: List[Var] = list() # when vectorizing this will be a list of lists self._variable_parameters: List[Var] = list() self._event_parameters_eqs0: List[Expr | Const] = list() self._event_parameters_eqs: List[Expr | Const] = list() self._constant_parameters: List[Var] = list() # when vectorizing this will be a list of np.array self._parameters_values: List[Const] = list() self._static_parameters_values_mapping: Dict[Var, Const] = dict() self._runtime_all_parameters_source: List[Var] = list() self._runtime_all_eqs_source: List[Expr | Const] = list() self._runtime_continuous_parameters: List[Var] = list() self._runtime_mode_parameters: List[Var] = list() self._runtime_continuous_eqs: List[Expr | Const] = list() self._runtime_mode_eqs: List[Expr | Const] = list() self._event_parameter_device_idtags: Dict[int, str] = dict() self._runtime_all_eqs_source0: List[Expr | Const] = list() self._runtime_continuous_slice: slice = slice(0, 0) self._runtime_mode_slice: slice = slice(0, 0) self._continuous_event_parameter_uids: Set[int] = set() self._discrete_event_parameter_uids: Set[int] = set() self._continuous_runtime_events: Dict[int, List[RmsEvent]] = dict() self._scheduled_mode_events: Dict[int, List[Tuple[float, float, bool]]] = dict() self._mode_event_cursor: Dict[int, int] = dict() self._active_events_group: RmsEventsGroup | None = None self._mode_runtime_expression_by_uid: Dict[int, Expr | Const] = dict() self._mode_runtime_initialized_uids: Set[int] = set() self._procedural_logic_updater: BlockProceduralLogicUpdater | None = None # function pointers self._derivative_fn: SymbolicDerivative | None = None self._event_params_fn: SymbolicParamsVector | None = None self._rhs_algeb_fn: Callable[[Vec, Vec, Vec, Vec], Vec] | None = None self._rhs_state_fn: Callable[[Vec, Vec, Vec, Vec], Vec] | None = None self._j11_fn: Callable[[Vec, Vec, Vec, Vec, float], sp.csc_matrix] | None = None self._j12_fn: Callable[[Vec, Vec, Vec, Vec, float], sp.csc_matrix] | None = None self._j21_fn: Callable[[Vec, Vec, Vec, Vec, float], sp.csc_matrix] | None = None self._j22_fn: Callable[[Vec, Vec, Vec, Vec, float], sp.csc_matrix] | None = None self._variable_parameters_values: Optional[Vec] = None self._last_variable_parameters_values: Optional[Vec] = None self._constant_params: Optional[Vec] = None self._block_boundary_updater: Any | None = None self._fmu_cs_adapters: List[object] = list() self._fmu_cs_initialized: bool = False self._fmu_me_adapters: List[object] = list() self._fmu_me_initialized: bool = False # -------------------------------------------------------------------------------------------------------------- # Initialize the RMS problem # -------------------------------------------------------------------------------------------------------------- ######################### Initialize containers############################# total_init_explicit_time: float = 0 t0 = time.perf_counter() diff_init_guess_common: Dict[int, float | int | complex | None] = dict() # dictionaries to store device-variable ifo self._vars_info: Dict[ALL_DEV_TYPES, List[Var]] = dict() self._vars_glob_name2uid: Dict[str, int] = dict() # dictionaries for compilation names self._compiler_names_dict: Dict[int, str] = dict() self._alias_names_dict: Dict[int, str] = dict() # dictionaries for variable position in the variables arrays self._uid2idx_vars: Dict[int, int] = dict() self._uid2idx_event_params: Dict[int, int] = dict() self._uid2idx_params: Dict[int, int] = dict() self._uid2idx_diff: Dict[int, int] = dict() self._uid2idx_t: Dict[int, int] = dict() # We put algebraic_vars that are actually states first self._algebraic_vars.sort(key=lambda obj: obj.diff_var is None) diff_vars_from_states = [var.diff_var for var in self._state_vars] for i, var in enumerate(diff_vars_from_states): state_var = self._state_vars[i] if var is None: diff_vars_from_states[i] = self.grid.var_factory.add_diff_var(name='aux', base_var=state_var) diff_vars_from_algebraic = [var.diff_var for var in self._algebraic_vars if var.diff_var is not None] self._diff_vars = diff_vars_from_states + diff_vars_from_algebraic # create time global time variable and add it to the compilation dict self._glob_time: Var = Var(self.TIME_NAME) self._compiler_names_dict[self._glob_time.uid] = self.TIME_NAME self._uid2idx_t[self._glob_time.uid] = 0 # Dictionary of state and algebraic vars self.sys_vars: Dict[int, Var] = dict() # initialize balance equation arrays n = len(self.grid.buses) P: ObjVec = np.zeros(n, dtype=object) Q: ObjVec = np.zeros(n, dtype=object) P_used: BoolVec = np.zeros(n, dtype=bool) Q_used: BoolVec = np.zeros(n, dtype=bool) branch_bus_p = np.zeros(n, dtype=float) branch_bus_q = np.zeros(n, dtype=float) # general indexes for variables and parameters self._n_vars = 0 self._n_params = 0 self._n_event_params = 0 self._n_diff = 0 ######################################## Initialize devices ######################################## # initialize buses bus_dict: Dict[Bus, int] = dict() for bus_num, elm in enumerate(self.grid.buses): bus_dict[elm] = bus_num self.add_variables_to_compilation_dicts(elm, elm.rms_model) # add init values from powerflow to initial guess if elm.is_dc: # DC bus: use Vdc (magnitude) - angle is not applicable for DC self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Vdc, float(np.abs(self.power_flow_results.voltage[bus_num]))) else: # AC bus: use Vm and Va self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Vm, float(np.abs(self.power_flow_results.voltage[bus_num]))) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Va, float(np.angle(self.power_flow_results.voltage[bus_num]))) # add model to system block self.sys_block.add(elm.rms_model) # initialize branches for branch_num, elm in enumerate(self.grid.get_branches_iter(add_vsc=False, add_hvdc=False, add_switch=True)): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: assign_static_api_object_mapping_for_device(grid=self.grid, device=elm, mdl=elm.rms_model, problem_mapping=self._static_parameters_values_mapping, logger=None) Vmf, Vaf = get_bus_rms_algebraic_vars(elm.bus_from.rms_model) Vmt, Vat = get_bus_rms_algebraic_vars(elm.bus_to.rms_model) self.add_variables_to_compilation_dicts(elm, elm.rms_model) register_rms_fmu_cs_device(self, elm, elm.rms_model) register_rms_fmu_me_device(self, elm, elm.rms_model) # add init values from powerflow to initial guess self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Pf, self.Sf[branch_num].real) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Qf, self.Sf[branch_num].imag) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Pt, self.St[branch_num].real) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Qt, self.St[branch_num].imag) f_idx = bus_dict[elm.bus_from] t_idx = bus_dict[elm.bus_to] branch_bus_p[f_idx] += self.Sf[branch_num].real branch_bus_q[f_idx] += self.Sf[branch_num].imag branch_bus_p[t_idx] += self.St[branch_num].real branch_bus_q[t_idx] += self.St[branch_num].imag if VarPowerFlowReferenceType.If_dc in elm.rms_model.external_mapping and Vmf is not None: if Vmf.uid in self.uid2idx_vars: vmf_idx = self.uid2idx_vars[Vmf.uid] if vmf_idx in self.init_guess: vmf0: float = self.init_guess[vmf_idx] if abs(vmf0) > 1e-9: self.set_init_guess( elm.rms_model, VarPowerFlowReferenceType.If_dc, self.Sf[branch_num].real / vmf0, ) else: pass else: pass # Run explicit initialization for branches to solve algebraic equations if isinstance(elm, Transformer2W): if self.options.initialization_method == RmsInitializationMethod.Explicit: diff_sys_vars: Dict[int, Var] = {diff_var.uid: diff_var for diff_var in self._diff_vars} rms_compiler_init = RMSCompiler( variables=list(self.sys_vars.values()), diff_vars=list(diff_sys_vars.values()), v_params=self._variable_parameters, c_params=self._constant_parameters, dt_var=Var("dt"), compiler_names_dict=self._compiler_names_dict, ) compile_single_equation = build_rms_single_equation_compiler(rms_compiler_init) # New init_explicit_common path self.init_guess, diff_init_guess_common = init_explicit_common( mdl=elm.rms_model, sys_vars=self.sys_vars, sys_diff_vars=diff_sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs0, constant_parameters=self._constant_parameters, event_param_init_dict=self.event_params_init_dict, init_guess=self.init_guess, diff_init_guess=diff_init_guess_common, uid2idx_vars=self.uid2idx_vars, uid2idx_diff=self._uid2idx_diff, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, params_array=self._parameters_values, compile_single_equation=compile_single_equation, verbose=bool(self.options.verbose > 0), ) elif self.options.initialization_method == RmsInitializationMethod.PseudoTransient: self.init_guess = init_pseudo_transient( mdl=elm.rms_model, sys_vars=self.sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs0, constant_parameters=self._constant_parameters, init_guess=self.init_guess, uid2idx_vars=self._uid2idx_vars, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME, dtau0=1.0, max_iter=max(500, int(self.options.max_iter)), tol=1e-8, verbose=bool(self.options.verbose > 0), ) # add model to system block self.sys_block.add(elm.rms_model) # add variable to conservation equations of the bus to which the element is connected f = bus_dict[elm.bus_from] t = bus_dict[elm.bus_to] setP(P, P_used, f, -elm.rms_model.E(VarPowerFlowReferenceType.Pf)) setP(P, P_used, t, -elm.rms_model.E(VarPowerFlowReferenceType.Pt)) if not elm.bus_from.is_dc and VarPowerFlowReferenceType.Qf in elm.rms_model.external_mapping: setQ(Q, Q_used, f, -elm.rms_model.E(VarPowerFlowReferenceType.Qf)) else: pass if not elm.bus_to.is_dc and VarPowerFlowReferenceType.Qt in elm.rms_model.external_mapping: setQ(Q, Q_used, t, -elm.rms_model.E(VarPowerFlowReferenceType.Qt)) else: pass # Populating VSCs init guess for i, elm in enumerate(self.grid.get_vsc()): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: mdl = elm.rms_model assign_static_api_object_mapping_for_device(grid=self.grid, device=elm, mdl=mdl, problem_mapping=self._static_parameters_values_mapping, logger=self.logger ) St_vsc = self.power_flow_results.St_vsc / self.grid.Sbase Sf_vsc = (self.power_flow_results.Pfn_vsc[i] + self.power_flow_results.Pfp_vsc[i]) / self.grid.Sbase # fill init_guess self.add_variables_to_compilation_dicts(elm, mdl) f = bus_dict[elm.bus_from] t = bus_dict[elm.bus_to] pt_init = St_vsc[i].real qt_init = St_vsc[i].imag vm_t = np.abs(self.power_flow_results.voltage[t]) im_init = np.sqrt(pt_init * pt_init + qt_init * qt_init) / (vm_t + 1e-12) if i < len(self.power_flow_results.It_vsc): it_mag = np.abs(self.power_flow_results.It_vsc[i]) / self.grid.Sbase if np.isfinite(it_mag) and it_mag > 0.0: im_init = it_mag self.set_init_guess(mdl, VarPowerFlowReferenceType.Pf, Sf_vsc) self.set_init_guess(mdl, VarPowerFlowReferenceType.Pt, pt_init) self.set_init_guess(mdl, VarPowerFlowReferenceType.Qt, qt_init) if VarPowerFlowReferenceType.Im in mdl.external_mapping: im_init = float(np.abs(self.power_flow_results.It_vsc[i]) / self.grid.Sbase) self.set_init_guess(mdl, VarPowerFlowReferenceType.Im, im_init) else: pass setP(P, P_used, f, -mdl.E(VarPowerFlowReferenceType.Pf)) setP(P, P_used, t, -mdl.E(VarPowerFlowReferenceType.Pt)) if VarPowerFlowReferenceType.Qt in mdl.external_mapping and not elm.bus_to.is_dc: setQ(Q, Q_used, t, -mdl.E(VarPowerFlowReferenceType.Qt)) else: pass self.sys_block.add(mdl) # Populating HVDC init guess (similar to VSCs) for i, elm in enumerate(self.grid.get_hvdc()): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: mdl = elm.rms_model self.add_variables_to_compilation_dicts(elm, mdl) self.set_init_guess(mdl, VarPowerFlowReferenceType.Pf_hvdc, self.power_flow_results.Pf_hvdc[i] / self.grid.Sbase) self.set_init_guess(mdl, VarPowerFlowReferenceType.Pt_hvdc, self.power_flow_results.Pt_hvdc[i] / self.grid.Sbase) f = bus_dict[elm.bus_from] t = bus_dict[elm.bus_to] setP(P, P_used, f, -mdl.E(VarPowerFlowReferenceType.Pf)) setP(P, P_used, t, -mdl.E(VarPowerFlowReferenceType.Pt)) setQ(Q, Q_used, f, -mdl.E(VarPowerFlowReferenceType.Qf)) setQ(Q, Q_used, t, -mdl.E(VarPowerFlowReferenceType.Qt)) self.sys_block.add(mdl) # initialize injections for elm in grid.get_vsc(): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: # find init values for the variables of this model if self.options.initialization_method == RmsInitializationMethod.Explicit: # common initialization to integrate # create constant parameters array params_array: np.ndarray = np.zeros( len(self._constant_parameters)) # array with the lenght of constant params for param, const in elm.rms_model.parameters.items(): params_array[self._uid2idx_params[param.uid]] = const.value diff_sys_vars: Dict[int, Var] = {diff_var.uid: diff_var for diff_var in self._diff_vars} # dictionary uid, var for diff_vars rms_compiler_init = RMSCompiler( variables=list(self.sys_vars.values()), diff_vars=list(diff_sys_vars.values()), v_params=self._variable_parameters, c_params=self._constant_parameters, dt_var=Var("dt"), compiler_names_dict=self._compiler_names_dict, ) compile_single_equation = build_rms_single_equation_compiler( rms_compiler_init) # function to compile one equation # New init_explicit_common path self.init_guess, diff_init_guess_common = init_explicit_common( mdl=elm.rms_model, sys_vars=self.sys_vars, sys_diff_vars=diff_sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs0, constant_parameters=self._constant_parameters, event_param_init_dict=self.event_params_init_dict, init_guess=self.init_guess, diff_init_guess=diff_init_guess_common, uid2idx_vars=self.uid2idx_vars, uid2idx_diff=self._uid2idx_diff, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, params_array=self._parameters_values, compile_single_equation=compile_single_equation, verbose=bool(self.options.verbose > 0), ) elif self.options.initialization_method == RmsInitializationMethod.PseudoTransient: self.init_guess = init_pseudo_transient( mdl=elm.rms_model, sys_vars=self.sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs0, constant_parameters=self._constant_parameters, init_guess=self.init_guess, uid2idx_vars=self._uid2idx_vars, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME, dtau0=1e-4, max_iter=max(500, int(self.options.max_iter)), tol=1e-8, verbose=bool(self.options.verbose > 0), ) else: raise ValueError("Not implemented initialization method") # add model to system block self.sys_block.add(elm.rms_model) gen_idx_map = {elm.idtag: i for i, elm in enumerate(grid.generators)} batt_idx_map = {elm.idtag: i for i, elm in enumerate(grid.batteries)} shunt_idx_map = {elm.idtag: i for i, elm in enumerate(grid.shunts)} loads_idx_map = {elm.idtag: i for i, elm in enumerate(grid.loads)} fixed_inj_p = np.zeros(n, dtype=float) fixed_inj_q = np.zeros(n, dtype=float) slack_gen_count = np.zeros(n, dtype=int) slack_gen_snom = np.zeros(n, dtype=float) for dev in grid.get_injection_devices_iter(): bidx = bus_dict[dev.bus] if not dev.rms_model.empty() and not dev.bus.is_dc: if dev.idtag in gen_idx_map and dev.bus.is_slack: slack_gen_count[bidx] += 1 snom = dev.Snom if snom <= 0.0: snom = 1.0 slack_gen_snom[bidx] += snom else: Vbus_pf = self.power_flow_results.voltage[bidx] if dev.idtag in gen_idx_map: gidx = gen_idx_map[dev.idtag] Sdev_pf = complex(float(dev.P), float(self.power_flow_results.gen_q[gidx])) / grid.Sbase elif dev.idtag in batt_idx_map: bdid = batt_idx_map[dev.idtag] Sdev_pf = complex(float(dev.P), float(self.power_flow_results.battery_q[bdid])) / grid.Sbase elif dev.idtag in shunt_idx_map: g_sh = float(dev.G) / grid.Sbase b_sh = float(dev.B) / grid.Sbase Sdev_pf = complex(g_sh * (abs(Vbus_pf) ** 2), -b_sh * (abs(Vbus_pf) ** 2)) elif dev.idtag in loads_idx_map: Sdev_pf = complex(-float(dev.P), -float(dev.Q)) / grid.Sbase else: Sdev_pf = complex(0.0, 0.0) fixed_inj_p[bidx] += Sdev_pf.real fixed_inj_q[bidx] += Sdev_pf.imag residual_p = branch_bus_p - fixed_inj_p residual_q = branch_bus_q - fixed_inj_q remaining_slack_gen = slack_gen_count.copy() remaining_slack_gen_snom = slack_gen_snom.copy() for elm in grid.get_injection_devices_iter(): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: bus_index = bus_dict[elm.bus] self.add_variables_to_compilation_dicts(elm, elm.rms_model) register_rms_fmu_cs_device(self, elm, elm.rms_model) register_rms_fmu_me_device(self, elm, elm.rms_model) if elm.bus.is_dc: self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.P, np.real(self.power_flow_results.Sbus[bus_index] / grid.Sbase)) else: Vbus = self.power_flow_results.voltage[bus_index] elm_active = bool(elm.get_active_at(None)) if elm.idtag in gen_idx_map: if not elm_active: Sdev = complex(0, 0) elif elm.bus.is_slack and remaining_slack_gen[bus_index] > 0: elm_snom = float(elm.Snom) if elm_snom <= 0.0: elm_snom = 1.0 if remaining_slack_gen[bus_index] == 1: share = 1.0 else: snom_den = remaining_slack_gen_snom[bus_index] if snom_den <= 0.0: share = 1.0 / remaining_slack_gen[bus_index] else: share = elm_snom / snom_den P_val = residual_p[bus_index] * share Q_val = residual_q[bus_index] * share remaining_slack_gen[bus_index] -= 1 remaining_slack_gen_snom[bus_index] -= elm_snom residual_p[bus_index] -= P_val residual_q[bus_index] -= Q_val Sdev = complex(P_val, Q_val) else: gidx = gen_idx_map[elm.idtag] Sdev = complex(float(elm.P), float(self.power_flow_results.gen_q[gidx])) / grid.Sbase elif elm.idtag in batt_idx_map: if not elm_active: Sdev = complex(0, 0) else: bidx = batt_idx_map[elm.idtag] Sdev = complex(float(elm.P), float(self.power_flow_results.battery_q[bidx])) / grid.Sbase elif elm.idtag in shunt_idx_map: g_sh = float(elm.G) / grid.Sbase b_sh = float(elm.B) / grid.Sbase Sdev = complex(g_sh * (abs(Vbus) ** 2), -b_sh * (abs(Vbus) ** 2)) elif elm.idtag in loads_idx_map: Sdev = complex(-float(elm.P), -float(elm.Q)) / grid.Sbase else: Sdev = self.power_flow_results.Sbus[bus_index] / grid.Sbase self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.P, Sdev.real) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Q, Sdev.imag) k = bus_dict[elm.bus] if VarPowerFlowReferenceType.P in elm.rms_model.external_mapping: setP(P, P_used, k, elm.rms_model.E(VarPowerFlowReferenceType.P)) if VarPowerFlowReferenceType.Q in elm.rms_model.external_mapping: setQ(Q, Q_used, k, elm.rms_model.E(VarPowerFlowReferenceType.Q)) if self.options.initialization_method == RmsInitializationMethod.Explicit: # else: # for common init explicit to integrate diff_sys_vars: Dict[int, Var] = {diff_var.uid: diff_var for diff_var in self._diff_vars} rms_compiler_init = RMSCompiler( variables=list(self.sys_vars.values()), diff_vars=list(diff_sys_vars.values()), v_params=self._variable_parameters, c_params=self._constant_parameters, dt_var=Var("dt"), compiler_names_dict=self._compiler_names_dict, ) compile_single_equation = build_rms_single_equation_compiler(rms_compiler_init) # New init_explicit_common path self.init_guess, diff_init_guess_common = init_explicit_common( mdl=elm.rms_model, sys_vars=self.sys_vars, sys_diff_vars=diff_sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs0, constant_parameters=self._constant_parameters, event_param_init_dict=self.event_params_init_dict, init_guess=self.init_guess, diff_init_guess=diff_init_guess_common, uid2idx_vars=self.uid2idx_vars, uid2idx_diff=self._uid2idx_diff, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, params_array=self._parameters_values, compile_single_equation=compile_single_equation, verbose=bool(self.options.verbose > 0), ) # initialize variables with no init equation assigned # run_rms_native_initialization(self, self.options) elif self.options.initialization_method == RmsInitializationMethod.PseudoTransient: self.init_guess = init_pseudo_transient( mdl=elm.rms_model, sys_vars=self.sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs0, constant_parameters=self._constant_parameters, init_guess=self.init_guess, uid2idx_vars=self._uid2idx_vars, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME, max_iter=100, tol=1e-5, verbose=bool(self.options.verbose > 0), ) # not implemented yet # elif self.options.initialization_method == RmsInitializationMethod.PseudoTransient: # init_pseudo_transient( # mdl=elm.rms_model, # sys_vars=self.sys_vars, # variable_parameters=self._variable_parameters, # event_parameters_eqs=self._event_parameters_eqs0, # constant_parameters=self._constant_parameters, # init_guess=self.init_guess, # uid2idx_vars=self._uid2idx_vars, # uid2idx_params=self._uid2idx_params, # uid2idx_event_params=self._uid2idx_event_params, # compiler_names_dict=self._compiler_names_dict, # alias_names_dict=self._alias_names_dict, # VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, # TIME_NAME=self.TIME_NAME, # VARS_NAME=self.VARS_NAME, # DIFF_NAME=self.DIFF_NAME, # CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME, # dtau0=1e0, # max_iter=1000, # tol=1e-6 # ) else: raise ValueError("Not implemented initialization method") self.sys_block.add(elm.rms_model) total_init_explicit_time += time.perf_counter() - t0 # print(f"\nTotal time explicit initialization: {total_init_explicit_time:.6f} seconds") self.logger.add_info("Total time explicit initialization", value=total_init_explicit_time) if self.progress_signal is not None: self.progress_signal.emit(10) event_eq_by_uid: Dict[int, Expr | Const] = { ep.uid: eq for ep, eq in zip(self._variable_parameters, self._event_parameters_eqs0) } for i, ep in enumerate(self._runtime_all_parameters_source): if ep.uid in self._discrete_event_parameter_uids: pass else: runtime_eq = self._runtime_all_eqs_source[i] if isinstance(runtime_eq, Const) and runtime_eq.value is None and ep.uid in event_eq_by_uid: self._runtime_all_eqs_source[i] = event_eq_by_uid[ep.uid] else: pass for i, eq in enumerate(self._runtime_all_eqs_source): if eq is None or (isinstance(eq, Const) and eq.value is None): raise Exception(f"Runtime event parameter {self._runtime_all_parameters_source[i]} has None Value") # Keep runtime event-parameter sources aligned with explicit initialization # results. Explicit initialization resolves scalar event parameters by # replacing entries in ``_event_parameters_eqs0`` with ``Const(value)``. # Mirror those resolved constants into runtime sources so event-group # baselines and runtime arrays start from the same initialized values. for i, eq0 in enumerate(self._event_parameters_eqs0): if isinstance(eq0, Const) and eq0.value is not None: self._runtime_all_eqs_source[i] = Const(eq0.value) else: pass # print("start creating balance equations") # add the nodal balance equations ac_virtual_buses = [elm.bus_to.idtag for elm in grid.get_vsc()] for i, elm in enumerate(self.grid.buses): if not P_used[i] and not Q_used[i]: self.logger.add_error("Isolated bus", value=i) else: if elm.is_dc: self._algebraic_eqs.append(P[i]) elif (elm.idtag in ac_virtual_buses): self._algebraic_eqs.append(P[i]) else: self._algebraic_eqs.append(Q[i]) self._algebraic_eqs.append(P[i]) # print("created balance equations") # We define the parameter dt and delta self._dt = Var(name='dt') self._delta = Var(name='delta') self._variable_parameters.append(self._dt) self._variable_parameters.append(self._delta) self._event_parameters_eqs0.append(Const(1e-3)) self._event_parameters_eqs0.append(Const(1)) self._runtime_all_parameters_source.append(self._dt) self._runtime_all_parameters_source.append(self._delta) self._runtime_all_eqs_source.append(Const(1e-3)) self._runtime_all_eqs_source.append(Const(1)) # add these parameters, m is for variable parameters self._compiler_names_dict[self._dt.uid] = f"{self.VARIABLE_PARAMS_NAME}[{self._n_event_params}]" self._alias_names_dict[self._dt.uid] = f"{self.VARIABLE_PARAMS_NAME}_{self._n_event_params}" self._uid2idx_event_params[self._dt.uid] = self._n_event_params self._n_event_params += 1 self._compiler_names_dict[self._delta.uid] = f"{self.VARIABLE_PARAMS_NAME}[{self._n_event_params}]" self._alias_names_dict[self._delta.uid] = f"{self.VARIABLE_PARAMS_NAME}_{self._n_event_params}" self._uid2idx_event_params[self._delta.uid] = self._n_event_params self._n_event_params += 1 self._runtime_all_eqs_source0 = list(self._runtime_all_eqs_source) ##################### To be removed when order is preserved in the first part ############################# self._state_algeb_vars = list(self.sys_vars.values()) self._n_state = len(self._state_vars) self._n_alg = len(self._algebraic_vars) self._n_algebraic = len(self._algebraic_eqs) self._uid2idx_vars: Dict[int, int] = dict() self._uid2idx_diff: Dict[int, int] = dict() self._uid2idx_t: Dict[int, int] = dict() i = 0 for v in self._state_vars: self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}" self._uid2idx_vars[v.uid] = i i += 1 for v in self._algebraic_vars: self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}" self._uid2idx_vars[v.uid] = i i += 1 for k, ep in enumerate(self._diff_vars): self._compiler_names_dict[ep.uid] = f"{self.DIFF_NAME}[{k}]" self._alias_names_dict[ep.uid] = f"{self.DIFF_NAME}_{k}" self._uid2idx_diff[ep.uid] = k self._compiler_names_dict[self._glob_time.uid] = self.TIME_NAME self._alias_names_dict[self._glob_time.uid] = self.TIME_NAME self._uid2idx_t[self._glob_time.uid] = 0 for it, eq in enumerate(self._event_parameters_eqs0): if isinstance(eq, Const) and eq.value is None: raise Exception(f' Event parameter {self._variable_parameters[it]} has None Value')
[docs] def set_events_group(self, rms_events_group: RmsEventsGroup): """ add events modifying values of event_parameters equations :param rms_events_group: :return: """ same_group_requested: bool if self._active_events_group is None: same_group_requested = rms_events_group is None and len(self._scheduled_mode_events) > 0 elif rms_events_group is None: same_group_requested = False else: same_group_requested = self._active_events_group.idtag == rms_events_group.idtag if same_group_requested: return active_runtime_eqs: List[Expr | Const] = list(self._runtime_all_eqs_source0) if self._continuous_event_parameter_uids: collect_continuous_events: Dict[int, List[RmsEvent]] = { uid: list() for uid in self._continuous_event_parameter_uids } else: collect_continuous_events = dict() scheduled_mode_events: Dict[int, List[Tuple[float, float, bool]]] = dict() selected_events = self._get_rms_events_for_group(rms_events_group) for rms_evt in selected_events: if not isinstance(rms_evt.parameter, Var): pass else: if not self._event_targets_registered_parameter(rms_evt, int(rms_evt.parameter.uid)): pass else: parameter_uid: int = int(rms_evt.parameter.uid) transition_tpe: DynamicEventTransitionType = rms_evt.transition_type use_scheduled_mode: bool = _is_rms_event_routed_to_scheduled_mode( transition_tpe=transition_tpe, parameter_uid=parameter_uid, discrete_parameter_uids=self._discrete_event_parameter_uids, continuous_parameter_uids=self._continuous_event_parameter_uids, ) # The event routing must preserve the legacy discrete step # path while still allowing the same symbolic parameter to # host a continuous ramp. Using the transition profile as the # primary routing decision keeps the historical step response # compatible with the reference CSVs and fixes the broken ramp # behavior without splitting model parameters into separate # symbolic registrations. if use_scheduled_mode: if parameter_uid in self._discrete_event_parameter_uids: event_list = scheduled_mode_events.setdefault(parameter_uid, list()) force_step_alignment: bool = bool(rms_evt.force_step_alignment) # Only true discrete/mode parameters should use the # scheduled latch path. Event-dict parameters such as # ``Pl0`` historically followed the symbolic piecewise # path even for step events, and the stored RMS CSV # references were generated from that behavior. event_list.append( ( float(rms_evt.time), float(rms_evt.value), force_step_alignment, ) ) else: if parameter_uid in collect_continuous_events: collect_continuous_events[parameter_uid].append(rms_evt) else: pass else: if parameter_uid in collect_continuous_events: collect_continuous_events[parameter_uid].append(rms_evt) else: pass self._continuous_runtime_events = dict() for parameter_uid, event_list in collect_continuous_events.items(): if len(event_list) == 0: pass else: sorted_events: List[RmsEvent] = sorted(event_list, key=_get_rms_event_time_sort_key) if len(sorted_events) == 1: parameter_index: int | None = None runtime_parameter: Var # The single-event path keeps the historical symbolic RMS # behavior so existing CSV references remain valid. Only true # multi-event combinations are diverted to the procedural # replay path introduced to avoid fragile nested expressions. for runtime_parameter_index, runtime_parameter in enumerate(self._runtime_all_parameters_source): if runtime_parameter.uid == parameter_uid: parameter_index = runtime_parameter_index break else: pass if parameter_index is not None: active_expr: Expr | Const = active_runtime_eqs[parameter_index] active_runtime_eqs[parameter_index] = _build_runtime_expression_from_single_rms_event( time_var=self._glob_time, base_expression=active_expr, event_item=sorted_events[0], ) else: pass else: self._continuous_runtime_events[parameter_uid] = sorted_events self._runtime_all_eqs_source = active_runtime_eqs self._scheduled_mode_events = scheduled_mode_events self._active_events_group = rms_events_group # The active runtime equations must replace the current event-parameter # equations before recompilation. Otherwise the JIT event-parameter # function is rebuilt from the original baseline expressions and ramp # transitions collapse back to the pre-event constant or step value. self._event_parameters_eqs = list(active_runtime_eqs) self._rebuild_runtime_parameter_partition() self._initialize_mode_event_state() self._initialize_procedural_logic_updater() if self.get_variable_parameter_number() > 0: self._variable_parameters_values = np.ones(self.get_variable_parameter_number(), dtype=np.float64) else: self._variable_parameters_values = np.zeros(0, dtype=np.float64) # -------------------------------------------------------------------------------------------------------------- # Compile RHS and Jacobian using JIT Compiler adaptation # -------------------------------------------------------------------------------------------------------------- timings = dict() # print("Compiling RMS using JIT Native Compiler...") t0 = _tic() rms_compiler = RMSCompiler( variables=self._state_algeb_vars, diff_vars=self._diff_vars, v_params=self._variable_parameters, c_params=self._constant_parameters, dt_var=self._dt, compiler_names_dict=self._compiler_names_dict ) timings["Compiler Setup"] = _toc(t0) t0 = _tic() self._derivative_fn = rms_compiler.compile_derivative_fn(self._uid2idx_vars) timings["SymbolicDerivative"] = _toc(t0) t0 = _tic() self._event_params_fn = rms_compiler.compile_event_params_fn( eqs=self._event_parameters_eqs, alias_names_dict=self._alias_names_dict, EVENT_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, ) timings["Event parameters"] = _toc(t0) t0 = _tic() self._rhs_algeb_fn = rms_compiler.compile_rhs(self._algebraic_eqs, "rhs_algeb") timings["RHS algebraic"] = _toc(t0) if len(self._state_eqs) != 0: t0 = _tic() self._rhs_state_fn = rms_compiler.compile_rhs(self._state_eqs, "rhs_state") timings["RHS state"] = _toc(t0) t0 = _tic() self._j11_fn = rms_compiler.compile_sparse_jacobian(self._state_eqs, self._state_vars, "j11") timings["J11 (dF/dx)"] = _toc(t0) t0 = _tic() self._j12_fn = rms_compiler.compile_sparse_jacobian(self._state_eqs, self._algebraic_vars, "j12") timings["J12 (dF/dy)"] = _toc(t0) t0 = _tic() self._j21_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._state_vars, "j21") timings["J21 (dG/dx)"] = _toc(t0) t0 = _tic() self._j22_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._algebraic_vars, "j22") timings["J22 (dG/dy)"] = _toc(t0) else: t0 = _tic() self._j22_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._algebraic_vars, "j22") timings["J22 only (no states)"] = _toc(t0) if self.options.verbose > 0: print(f"Model compiled with {self._n_vars} variables") print("\nCompilation timing summary:") for k, v in timings.items(): print(f" {k:30s}: {v:8.4f} s") print(f"\nTotal JIT compile time: {sum(timings.values()):.4f} s") variable_parameters_init = np.ones(self.get_variable_parameter_number()) # TODO: think about this thing of calling twice here self._variable_parameters_values = self._event_params_fn(variable_parameters_init, 0.0) self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, 0.0) self._mode_runtime_initialized_uids = set() if self.get_all_vars_number() > 0 and self.get_variable_parameter_number() > 0: self._initialize_latched_mode_defaults(t=0.0, x=self.get_x0()) self._constant_params = np.array([const.value for const in self._parameters_values]) self._block_boundary_updater = build_boundary_updater_from_block(self) if self.options.verbose > 0: print(f"\nTotal compile time: {sum(timings.values()):.4f} s") # we mark the problem as ready for simulation self.set_initialize_flag()
[docs] def get_next_forced_event_time(self, t_prev: float, t_target: float) -> Optional[float]: t_mode = _get_next_forced_mode_event_time(self._scheduled_mode_events, t_prev, t_target) t_proc: Optional[float] = None if self._procedural_logic_updater is not None: t_proc = self._procedural_logic_updater.get_next_forced_event_time(t_prev, t_target) if t_mode is None: return t_proc if t_proc is None: return t_mode return min(t_mode, t_proc)
def _initialize_procedural_logic_updater(self) -> None: entries: List = list() for blk in self.sys_block.get_all_blocks(): if blk.procedural_logic: entries.extend(blk.procedural_logic) if len(entries) == 0: self._procedural_logic_updater = None return self._procedural_logic_updater = BlockProceduralLogicUpdater(self, entries) def _register_runtime_event_parameters(self, dev: ALL_DEV_TYPES, mdl: Block) -> None: """ Register runtime-updatable parameters declared by the device block. """ if not mdl.event_dict and not mdl.mode_dict: return for parameter, expression in mdl.event_dict.items(): init_eq_for_parameter: Expr | Const | None = None for init_var, init_eq in mdl.init_eqs.items(): if init_var.uid == parameter.uid: init_eq_for_parameter = init_eq break expression_for_classification: Expr | Const = expression if isinstance(expression, Const) and expression.value is None and init_eq_for_parameter is not None: expression_for_classification = init_eq_for_parameter else: pass self._event_parameter_device_idtags[parameter.uid] = dev.idtag # ``Block.event_dict`` declares parameters that the user expects to be # externally drivable during the simulation. Some models, such as the # RMS load template, initialize those parameters with expressions that # reference algebraic outputs only to copy the steady-state operating # point at ``t = 0``. Classifying them as discrete just because the # initialization expression references system variables prevents ramp # events from ever using the continuous runtime path and collapses the # transition into a latched step. The runtime partition therefore uses # the initialization expression only as the starting value, while the # event_dict parameter itself remains a continuous event target. if parameter.uid in mdl.mode_dict: self._discrete_event_parameter_uids.add(parameter.uid) else: self._continuous_event_parameter_uids.add(parameter.uid) for parameter in mdl.mode_dict.keys(): self._event_parameter_device_idtags[parameter.uid] = dev.idtag self._discrete_event_parameter_uids.add(parameter.uid) def _expression_references_system_vars(self, expression: Expr | Const) -> bool: if isinstance(expression, Const): return False try: vars_in_expr: List[Var] = get_expression_vars(expression) except Exception: return False for var in vars_in_expr: if var.uid in self._uid2idx_vars: return True return False def _event_targets_registered_parameter(self, evt: object, parameter_uid: int) -> bool: """ Return whether the event targets a runtime parameter registered for this device. """ if parameter_uid in self._event_parameter_device_idtags: registered_device_idtag: str | None = self._event_parameter_device_idtags[parameter_uid] else: registered_device_idtag = None try: event_device_idtag: str = str(evt.device_idtag) except Exception: event_device_idtag = "" if registered_device_idtag is None: return False if event_device_idtag == "": return True return registered_device_idtag == event_device_idtag @property def boundary_update(self): return self def _get_rms_events_for_group(self, rms_events_group: RmsEventsGroup | None) -> List[RmsEvent]: if rms_events_group is None: return list(self.grid.rms_events) selected_events = list() for evt in self.grid.rms_events: try: if evt.group.idtag == rms_events_group.idtag: selected_events.append(evt) else: pass except Exception: pass return selected_events def _rebuild_runtime_parameter_partition(self) -> None: self._runtime_continuous_parameters = list() self._runtime_mode_parameters = list() self._runtime_continuous_eqs = list() self._runtime_mode_eqs = list() self._uid2idx_event_params = dict() n_source: int = len(self._runtime_all_parameters_source) i: int = 0 while i < n_source: parameter: Var = self._runtime_all_parameters_source[i] equation: Expr | Const = self._runtime_all_eqs_source[i] if parameter.uid in self._discrete_event_parameter_uids: self._runtime_mode_parameters.append(parameter) self._runtime_mode_eqs.append(equation) else: self._runtime_continuous_parameters.append(parameter) self._runtime_continuous_eqs.append(equation) i += 1 self._runtime_continuous_slice = slice(0, len(self._runtime_continuous_parameters)) self._runtime_mode_slice = slice( len(self._runtime_continuous_parameters), len(self._runtime_continuous_parameters) + len(self._runtime_mode_parameters) ) self._variable_parameters = list() self._event_parameters_eqs = list() for parameter in self._runtime_continuous_parameters: self._variable_parameters.append(parameter) for parameter in self._runtime_mode_parameters: self._variable_parameters.append(parameter) for equation in self._runtime_continuous_eqs: self._event_parameters_eqs.append(equation) for equation in self._runtime_mode_eqs: self._event_parameters_eqs.append(equation) self._n_event_params = len(self._variable_parameters) for k, parameter in enumerate(self._variable_parameters): self._uid2idx_event_params[parameter.uid] = k self._compiler_names_dict[parameter.uid] = f"{self.VARIABLE_PARAMS_NAME}[{k}]" self._alias_names_dict[parameter.uid] = f"{self.VARIABLE_PARAMS_NAME}_{k}" def _initialize_mode_event_state(self) -> None: self._mode_event_cursor = dict() for uid, event_list in self._scheduled_mode_events.items(): event_list.sort(key=_get_mode_event_sort_key) self._mode_event_cursor[uid] = 0 def _apply_scheduled_mode_events(self, t_curr: float, full_params: Vec) -> None: for uid, event_list in self._scheduled_mode_events.items(): if uid in self._mode_event_cursor: event_idx: int = self._mode_event_cursor[uid] else: event_idx = 0 n_events: int = len(event_list) while event_idx < n_events: event_time: float event_value: float force_step_alignment: bool event_time, event_value, force_step_alignment = event_list[event_idx] # Scheduled RMS step events must follow the same strict-after # activation boundary as the symbolic ``piecewise`` helper used # by the legacy RMS reference path. The symbolic backend applies # a new value only once ``t`` is strictly greater than the event # time because ``heaviside(0)`` evaluates to zero. Reusing that # convention here keeps scheduled mode updates aligned with the # historical sampled trajectories. if _is_rms_time_strictly_after_event(t_curr=t_curr, event_time=event_time): pass else: break if uid in self._uid2idx_event_params: runtime_idx: Optional[int] = self._uid2idx_event_params[uid] else: runtime_idx = None if force_step_alignment: if _is_rms_time_strictly_after_event(t_curr=t_curr, event_time=event_time): if runtime_idx is not None: full_params[runtime_idx] = event_value else: pass else: raise RuntimeError( f"Scheduled RMS mode event at t={event_time} requires exact step alignment, " f"but current solver time is t={t_curr}." ) else: if runtime_idx is not None: full_params[runtime_idx] = event_value else: pass event_idx += 1 self._mode_event_cursor[uid] = event_idx def _evaluate_runtime_expression_with_state(self, expression: Expr | Const, params: Vec, x: Vec, t: float) -> float: if isinstance(expression, Const): if expression.value is None: return 0.0 else: return float(expression.value) if isinstance(expression, Var): if expression.uid == self._glob_time.uid or expression.name == self.TIME_NAME: return float(t) if expression.uid in self._uid2idx_event_params: runtime_idx = self._uid2idx_event_params[expression.uid] else: runtime_idx = None if runtime_idx is not None: return float(params[runtime_idx]) if expression.uid in self._uid2idx_params: const_idx = self._uid2idx_params[expression.uid] else: const_idx = None if const_idx is not None: return float(self._parameters_values[const_idx].value) if expression.uid in self._uid2idx_vars: var_idx = self._uid2idx_vars[expression.uid] else: var_idx = None if var_idx is not None: return float(x[var_idx]) return 0.0 uid_bindings: Dict[int, float] = dict() for uid, idx in self._uid2idx_event_params.items(): uid_bindings[uid] = float(params[idx]) for uid, idx in self._uid2idx_params.items(): uid_bindings[uid] = float(self._parameters_values[idx].value) for uid, idx in self._uid2idx_vars.items(): uid_bindings[uid] = float(x[idx]) uid_bindings[self._glob_time.uid] = float(t) try: return float(expression.eval_uid(uid_bindings)) except Exception: return 0.0 def _update_dynamic_mode_defaults(self, t: float, x: Vec, params: Vec) -> None: for uid, expression in self._mode_runtime_expression_by_uid.items(): if uid in self._scheduled_mode_events and len(self._scheduled_mode_events[uid]) > 0: pass else: if uid in self._mode_runtime_initialized_uids: pass else: if uid in self._uid2idx_event_params: runtime_idx = self._uid2idx_event_params[uid] else: runtime_idx = None if runtime_idx is None: pass else: params[runtime_idx] = self._evaluate_runtime_expression_with_state(expression, params, x, t) self._mode_runtime_initialized_uids.add(uid) def _initialize_latched_mode_defaults(self, t: float, x: Vec) -> None: if self._variable_parameters_values is None: return self._update_dynamic_mode_defaults( t=float(t), x=x, params=self._variable_parameters_values, )
[docs] def reset_boundary_update_state(self, t0: float = 0.0) -> None: if self.get_variable_parameter_number() > 0 and self._event_params_fn is not None: self._variable_parameters_values = self._event_params_fn(np.ones(self.get_variable_parameter_number()), float(t0)) self._variable_parameters_values = self.def_event_params_fn(self._variable_parameters_values, float(t0)) else: self._variable_parameters_values = np.zeros(0, dtype=np.float64) self._mode_event_cursor = dict() self._initialize_mode_event_state() self._mode_runtime_initialized_uids = set() if self._procedural_logic_updater is not None: for logic in self._procedural_logic_updater.logic_entries: logic.bind(self) if self.get_all_vars_number() > 0 and self.get_variable_parameter_number() > 0: self._initialize_latched_mode_defaults(t=float(t0), x=self.get_x0())
[docs] def def_event_params_fn(self, ev_param: Vec, t: float) -> Vec: """ Evaluate runtime event parameter expressions while preserving mode latches. :param ev_param: Current runtime parameter vector. :param t: Simulation time. :return: Updated runtime parameter vector. """ runtime_continuous_eqs: List[Expr | Const] = self._runtime_continuous_eqs runtime_mode_slice: slice = self._runtime_mode_slice if len(runtime_continuous_eqs) == 0 or self._event_params_fn is None: return ev_param else: pass mode_snapshot: Optional[Vec] = None if runtime_mode_slice.start != runtime_mode_slice.stop: mode_snapshot = ev_param[runtime_mode_slice].copy() else: pass # The symbolic event-parameter function still evaluates the baseline # runtime expressions. Continuous multi-event parameters are then replayed # procedurally below so arbitrary event combinations do not build fragile # nested symbolic piecewise trees. updated: Vec = self._event_params_fn(ev_param, t) updated = self._event_params_fn(updated, t) if mode_snapshot is not None: updated[runtime_mode_slice] = mode_snapshot else: pass parameter_uid: int event_list: List[RmsEvent] for parameter_uid, event_list in self._continuous_runtime_events.items(): runtime_idx: Optional[int] = self._uid2idx_event_params.get(parameter_uid, None) if runtime_idx is not None: base_value: float = float(updated[runtime_idx]) updated[runtime_idx] = _evaluate_rms_runtime_parameter_value_from_events( base_value=base_value, event_list=event_list, t_curr=float(t), ) else: pass return updated
[docs] def update_variable_params(self, t: float, x_snapshot: Optional[Vec] = None, scheduled_t: float | None = None) -> None: """ Update the variable parameters. Continuous runtime parameters are re-evaluated, while retained mode parameters are left untouched unless updated by boundary logic. The RMS implicit solve needs two distinct time views. Continuous ramp-like parameters must be evaluated at the current local target time so the nonlinear solve sees the interpolated value inside the step. Historical scheduled step events, however, must keep the pre-event value on the sample aligned with the event instant and only latch afterwards. The optional ``scheduled_t`` argument therefore lets the caller decouple the continuous symbolic evaluation time from the scheduled event boundary time without duplicating the update logic. :param t: Continuous symbolic evaluation time. :param x_snapshot: Current state snapshot used by boundary logic. :param scheduled_t: Optional time used by scheduled step logic. :return: None. """ scheduled_time: float if self._event_params_fn is None: raise ValueError("_event_params_fn is None") else: pass if scheduled_t is None: scheduled_time = float(t) else: scheduled_time = float(scheduled_t) self._variable_parameters_values = self.def_event_params_fn(self._variable_parameters_values, t) self._apply_scheduled_mode_events(scheduled_time, self._variable_parameters_values) if self._block_boundary_updater is not None and x_snapshot is not None: if self._constant_params is None: constant_params = np.zeros(0, dtype=float) else: constant_params = self._constant_params full_params = np.concatenate((self._variable_parameters_values.copy(), constant_params)) self._block_boundary_updater.update(float(t), x_snapshot, full_params) self._variable_parameters_values[:] = full_params[:self.get_variable_parameter_number()]
[docs] def get_static_state_matrix(self, x:Vec, dx:Vec): nx = self.get_states_number() ny = self.get_algebraic_var_number() if nx == 0: gy = self.get_j22(x, dx, 1e15).toarray() return gy fx = self.get_j11(x, dx, 1e10).toarray() fy = self.get_j12(x, dx, 1e10).toarray() gx = self.get_j21(x, dx, 1e10).toarray() gy = self.get_j22(x, dx, 1e15).toarray() return np.block([[fx, fy], [gx, gy]])
[docs] def update(self, t: float, x: Vec, params: Vec) -> None: self._update_dynamic_mode_defaults(t=t, x=x, params=params) if self._procedural_logic_updater is not None: self._procedural_logic_updater.update(t=t, x=x, params=params) self._variable_parameters_values[:] = params[: len(self._variable_parameters)]
[docs] def add_variables_to_compilation_dicts(self, elm: ALL_DEV_TYPES, mdl: Block): self.add_block_variables_to_compilation_dicts(elm, mdl) for child in mdl.children: self.add_variables_to_compilation_dicts(elm, child)
[docs] def add_block_variables_to_compilation_dicts(self, elm: ALL_DEV_TYPES, mdl: Block): """ add variables and parameters info to the system block :param elm: :type elm: Union[VeraGridEngine.Devices.Substation.bus.Bus, VeraGridEngine.Devices.Injections.load.Load] :param mdl: :type mdl: VeraGridEngine.Utils.Symbolic.block.Block :return: :rtype: None """ # i is for variables for v in mdl.state_vars: if v.uid in self._uid2idx_vars: raise ValueError(f"State variable '{v.name}' (uid={v.uid}) is already registered in the system. " f"Previous device may have created a duplicate variable.") self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{self._n_vars}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{self._n_vars}" self._uid2idx_vars[v.uid] = self._n_vars self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=mdl) self.add_device_var(dev=elm, var=v) self.sys_vars[v.uid] = v self._state_vars.append(v) self._n_vars += 1 for v in mdl.algebraic_vars: if v.uid in self._uid2idx_vars: raise ValueError(f"Algebraic variable '{v.name}' (uid={v.uid}) is already registered in the system. " f"Previous device may have created a duplicate variable.") self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{self._n_vars}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{self._n_vars}" self._uid2idx_vars[v.uid] = self._n_vars self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=mdl) self.add_device_var(dev=elm, var=v) self.sys_vars[v.uid] = v self._algebraic_vars.append(v) self._n_vars += 1 # j is for parameters # for ep, const in mdl.parameters.items(): # if ep.uid in self._uid2idx_params: # raise ValueError(f"Parameter '{ep.name}' (uid={ep.uid}) is already registered in the system. " # f"Previous device may have created a duplicate parameter.") # self._compiler_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}[{self._n_params}]" # self._alias_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}_{self._n_params}" # self._uid2idx_params[ep.uid] = self._n_params # self._constant_parameters.append(ep) # self._parameters_values.append(const) # self._n_params += 1 for ep, const in mdl.parameters.items(): if ep.uid in self._uid2idx_params: # Nested RMS block hierarchies can legally expose the same symbolic # parameter multiple times while sharing one UID. In that case the # parameter must be registered only once in the global problem and # later occurrences should only refresh the already stored value. existing_parameter_index: int = self._uid2idx_params[ep.uid] if ep in self._static_parameters_values_mapping: self._parameters_values[existing_parameter_index] = self._static_parameters_values_mapping[ep] else: pass continue self._compiler_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}[{self._n_params}]" self._alias_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}_{self._n_params}" self._uid2idx_params[ep.uid] = self._n_params self._constant_parameters.append(ep) # search value in self._static_parameters_values_mapping if ep in self._static_parameters_values_mapping: self._parameters_values.append(self._static_parameters_values_mapping[ep]) else: self._parameters_values.append(const) self._n_params += 1 # m is for variable parameters self._register_runtime_event_parameters(dev=elm, mdl=mdl) # Todo: function inside a function, refactor this! def _register_event_parameter(ep: Var, eq: Expr | Const, runtime_eq: Expr | Const | None = None) -> None: if ep.uid in self._uid2idx_event_params: raise ValueError(f"Event parameter '{ep.name}' (uid={ep.uid}) is already registered in the system. " f"Previous device may have created a duplicate event parameter.") self._compiler_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}[{self._n_event_params}]" self._alias_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}_{self._n_event_params}" self._uid2idx_event_params[ep.uid] = self._n_event_params effective_eq: Expr | Const = eq if isinstance(eq, Const) and eq.value is None: init_eq_for_ep: Expr | Const | None = None for init_var, init_eq in mdl.init_eqs.items(): if init_var.uid == ep.uid: init_eq_for_ep = init_eq break if init_eq_for_ep is not None: effective_eq = init_eq_for_ep self._variable_parameters.append(ep) self._event_parameters_eqs0.append(effective_eq) self._runtime_all_parameters_source.append(ep) runtime_expression: Expr | Const = effective_eq if runtime_eq is None else runtime_eq if runtime_eq is None and ep.uid in self._discrete_event_parameter_uids: if isinstance(eq, Const) and eq.value is not None: runtime_expression = Const(float(eq.value)) else: runtime_expression = Const(0.0) self._mode_runtime_expression_by_uid[ep.uid] = effective_eq self._runtime_all_eqs_source.append(runtime_expression) self._runtime_all_eqs_source0.append(runtime_expression) self._n_event_params += 1 for ep, eq in mdl.event_dict.items(): _register_event_parameter(ep, eq) for ep, eq in mdl.mode_dict.items(): _register_event_parameter(ep, eq) # l is for differential vars for v in mdl.diff_vars: if v.uid in self._uid2idx_diff: raise ValueError(f"Differential variable '{v.name}' (uid={v.uid}) is already registered in the system. " f"Previous device may have created a duplicate differential variable.") self._compiler_names_dict[v.uid] = f"{self.DIFF_NAME}[{self._n_diff}]" self._alias_names_dict[v.uid] = f"{self.DIFF_NAME}_{self._n_diff}" self._uid2idx_diff[v.uid] = self._n_diff self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=mdl) self.add_device_var(dev=elm, var=v) self._diff_vars.append(v) self._n_diff += 1 self._state_eqs.extend(mdl.state_eqs) self._algebraic_eqs.extend(mdl.algebraic_eqs) if self.progress_signal is not None: self.progress_signal.emit(20)
[docs] def set_init_guess(self, mdl: Block, reference_powerflow: VarPowerFlowReferenceType, val: float): """ add values from powerflow to initial guess :param mdl: :type mdl: :param reference_powerflow: :type reference_powerflow: :param val: :type val: :return: :rtype:self._ """ if reference_powerflow in mdl.external_mapping: var = mdl.external_mapping[reference_powerflow] self.init_guess[var.uid] = val
# print(f"DEBUG: set_init_guess {reference_powerflow.value} = {val} for var {var.name} (uid={var.uid})") # else: # print( # f"DEBUG: set_init_guess {reference_powerflow.value} NOT FOUND in external_mapping. Available: {[k.value for k in mdl.external_mapping.keys()]}")
[docs] def get_equation_at(self, i: int) -> Expr: """ Get the equation at a global position :param i: :return: """ if i < len(self._state_eqs): return self._state_eqs[i] else: i2 = i - len(self._state_eqs) return self._algebraic_eqs[i2]
[docs] def get_init_guess_info(self) -> pd.DataFrame: """ returns a df with uid, name, and initial value for the system variables :return: :rtype: """ vars_names = list() for key, value in self.init_guess.items(): var_name = self.sys_vars[key].name vars_names.append((key, var_name, value)) return pd.DataFrame(data=vars_names, columns=["key", "var_name", "value"])
[docs] def get_device_vars_dict(self) -> Dict[ALL_DEV_TYPES, List[Var]]: """ :return: :rtype: """ return self._vars_info
[docs] def add_device_var(self, dev: ALL_DEV_TYPES, var: Var): """ Associate a variable with a device :param dev: Device :param var: Variable """ if dev in self._vars_info: var_list = self._vars_info[dev] else: var_list = None if var_list is None: self._vars_info[dev] = [var] else: var_list.append(var)
[docs] def get_var_idx(self, v: Var) -> int: """ :param v: :return: """ return self._uid2idx_vars[v.uid]
@property def vars_glob_name2uid(self): """ :return: """ return self._vars_glob_name2uid def _register_global_var_name(self, name_key: str, uid: int, block: Block | None = None) -> None: prev_uid = self._vars_glob_name2uid.get(name_key) if prev_uid is None or prev_uid == uid: self._vars_glob_name2uid[name_key] = uid return block_tag = "" if block is not None: block_tag = f"::{block.name}#{block.uid}" disambiguated_key = f"{name_key}{block_tag}" if disambiguated_key == name_key: disambiguated_key = f"{name_key} [{uid}]" if disambiguated_key in self._vars_glob_name2uid and self._vars_glob_name2uid[disambiguated_key] != uid: raise ValueError( f"Global variable name collision for '{name_key}' and fallback '{disambiguated_key}': " f"existing uid={self._vars_glob_name2uid[disambiguated_key]}, new uid={uid}." ) self._vars_glob_name2uid[disambiguated_key] = uid @property def uid2idx_vars(self): """ :return: """ return self._uid2idx_vars @property def uid2idx_event_params(self): return self._uid2idx_event_params @property def uid2idx_params(self): return self._uid2idx_params @property def glob_time(self): return self._glob_time
[docs] def get_parameters_values(self) -> List[Const]: return self._parameters_values
@property def get_algebraic_vars(self): """ :return: """ return self._algebraic_vars @property def algebraic_vars(self): return self._algebraic_vars @property def algebraic_eqs(self): """ :return: """ return self._algebraic_eqs @property def variable_parameters(self): """ :return: """ return self._variable_parameters @property def event_parameters_eqs(self): """ :return: """ return self._event_parameters_eqs @property def event_parameters_eqs0(self): """ :return: """ return self._event_parameters_eqs0 @property def state_and_algebraic_vars(self) -> List[Var]: """ :return: """ variables = list() for lst in [self._state_vars, self._algebraic_vars]: for var in lst: variables.append(var) return variables @property def state_vars(self): """ :return: """ return self._state_vars @property def state_eqs(self): """ :return: """ return self._state_eqs
[docs] def get_all_vars_number(self) -> int: return self._n_vars
[docs] def get_diff_var_number(self) -> int: """ Get the number of diff vars :return: """ return len(self._diff_vars)
[docs] def get_algebraic_var_number(self) -> int: return len(self._algebraic_vars)
[docs] def get_states_number(self) -> int: return self._n_state
[docs] def get_variable_parameter_number(self) -> int: return len(self._variable_parameters)
[docs] def get_x0(self) -> Vec: """ Helper function to build the initial vector :return: array matching with the mapping, matching the solver ordering """ x = np.zeros(len(self._state_vars) + len(self._algebraic_vars)) for uid, val in self.init_guess.items(): if uid in self._uid2idx_vars: i = self._uid2idx_vars[uid] x[i] = val return x
[docs] def get_eventparams0(self) -> Vec: """ Helper function to build the initial vector :return: array matching with the mapping, matching the solver ordering """ x = np.zeros(len(self._variable_parameters)) for uid, val in self.event_params_init_dict.items(): i = self._uid2idx_event_params[uid] x[i] = val return x
[docs] def get_dx0(self) -> Vec: """ Helper function to build the initial vector :return: array matching with the mapping, matching the solver ordering """ x = np.zeros(len(self._diff_vars)) # for uid, val in self.init_guess.items(): # i = self._uid2idx_vars[uid] # x[i] = val return x
[docs] def initialize_fmu_cs_devices(self, x_snapshot: Vec, t: float = 0.0) -> None: """ Initialize imported FMU Co-Simulation devices before the RMS time loop starts. :param x_snapshot: Initial accepted state vector. :param t: Initial simulation time. :return: None. """ if len(self._fmu_cs_adapters) > 0: initialize_rms_fmu_cs_devices(problem=self, x_snapshot=x_snapshot, time_value=t) align_rms_fmu_cs_device_output_parameters(problem=self, x_snapshot=x_snapshot, time_value=t) else: self._fmu_cs_initialized = True
[docs] def advance_fmu_cs_devices(self, t: float, x_snapshot: Vec, h: float) -> None: """ Advance imported FMU Co-Simulation devices for one RMS communication step. :param t: Current simulation time. :param x_snapshot: Current accepted state vector. :param h: RMS communication step. :return: None. """ if len(self._fmu_cs_adapters) > 0: advance_rms_fmu_cs_devices(problem=self, time_value=t, x_snapshot=x_snapshot, step_size=h) else: pass
[docs] def close_fmu_cs_devices(self) -> None: """ Release imported FMU Co-Simulation devices after the RMS simulation ends. :return: None. """ if len(self._fmu_cs_adapters) > 0: close_rms_fmu_cs_devices(self) else: pass
[docs] def initialize_fmu_me_devices(self, x_snapshot: Vec, t: float = 0.0) -> None: """ Initialize imported FMU Model Exchange devices before the RMS time loop starts. :param x_snapshot: Initial accepted state vector. :param t: Initial simulation time. :return: None. """ if len(self._fmu_me_adapters) > 0: initialize_rms_fmu_me_devices(problem=self, x_snapshot=x_snapshot, time_value=t) else: self._fmu_me_initialized = True
[docs] def advance_fmu_me_devices(self, t: float, x_snapshot: Vec, h: float) -> None: """ Advance imported FMU Model Exchange devices for one RMS communication step. :param t: Current simulation time. :param x_snapshot: Current accepted state vector. :param h: RMS communication step. :return: None. """ if len(self._fmu_me_adapters) > 0: advance_rms_fmu_me_devices(problem=self, time_value=t, x_snapshot=x_snapshot, step_size=h) else: pass
[docs] def close_fmu_me_devices(self) -> None: """ Release imported FMU Model Exchange devices after the RMS simulation ends. :return: None. """ if len(self._fmu_me_adapters) > 0: close_rms_fmu_me_devices(self) else: pass
[docs] def get_dx(self, x: Vec, xn: Vec, dx: Vec, h: float) -> Vec: if self._derivative_fn is None: raise ValueError("_derivative_fn is None") return self._derivative_fn(x, xn, dx, h)
[docs] def rhs_state(self, x: Vec, dx: Vec) -> Vec: if self._rhs_state_fn is None: raise ValueError("_rhs_state_fn is None") return self._rhs_state_fn(x, dx, self._variable_parameters_values, self._constant_params)
[docs] def rhs_algebraic(self, x: Vec, dx: Vec) -> Vec: if self._rhs_algeb_fn is None: raise ValueError("_rhs_algeb_fn is None") return self._rhs_algeb_fn(x, dx, self._variable_parameters_values, self._constant_params)
[docs] def get_j11(self, x: Vec, dx: Vec, h: float): if self._j11_fn is None: raise ValueError("_j11_fn is None") return self._j11_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_j12(self, x: Vec, dx: Vec, h: float): if self._j12_fn is None: raise ValueError("_j12_fn is None") return self._j12_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_j21(self, x: Vec, dx: Vec, h: float): if self._j21_fn is None: raise ValueError("_j21_fn is None") return self._j21_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_j22(self, x: Vec, dx: Vec, h: float): if self._j22_fn is None: raise ValueError("_j22_fn is None") return self._j22_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_dt(self): return self._dt
[docs] def get_dt_value(self): dt_value = self._variable_parameters_values[-2] return dt_value
[docs] def get_compiler_names_dict(self): return self._compiler_names_dict
[docs] def get_alias_names_dict(self): return self._alias_names_dict
[docs] def get_diff_vars(self): return self._diff_vars
[docs] def get_E_matrix(self, x: Vec, dx: Vec): # We first find all diff_vars all_eqs = self._state_eqs + self._algebraic_eqs xdot = self._diff_vars E_call = SymbolicJacobian( eqs=all_eqs, variables=xdot, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, EVENT_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, PARAMS_NAME=self.CONSTANT_PARAMS_NAME, static=True ) n_states = self.get_states_number() vp = self._variable_parameters_values cp = self._constant_params n_vars = self._n_vars E_value = np.zeros((n_vars, n_vars)) E_partial = E_call(x, dx, vp, cp, h=0).toarray() print(f"DEBUG: E_partial shape: {E_partial.shape}") print(f"DEBUG: E_partial sample values (non-zero):") nz = np.nonzero(E_partial) # for i in range(min(20, len(nz[0]))): # row, col = nz[0][i], nz[1][i] # print(f" E_partial[{row},{col}] = {E_partial[row, col]}") # Debug: show equation 70 # print(f"\nDEBUG: Equation 70:") # print(f" {all_eqs[70]}") # print(f"\nDEBUG: Diff vars involved in row 70:") # for col in range(E_partial.shape[1]): # if abs(E_partial[70, col]) > 1e-10: # print(f" diff_var[{col}] = {xdot[col]}, value = {E_partial[70, col]}") # Map each d(eq)/d(diff_var_j) column to the column of the diff_var base # variable in the global [state_vars + algebraic_vars] ordering. for j, dvar in enumerate(xdot): base_var = dvar.base_var col_idx = self._uid2idx_vars.get(base_var.uid, None) if col_idx is None: continue E_value[:, col_idx] += E_partial[:, j] E_value[:n_states, :n_states] -= np.eye(n_states, dtype=E_value.dtype) return E_value