Source code for VeraGridEngine.Simulations.Rms.problems.rms_problem_phasor

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from typing import Dict, List
import time
import numpy as np
import pandas as pd
import scipy.sparse as sp

from VeraGridEngine.enumerations import ParamPowerFlowReferenceType
from VeraGridEngine.Devices import MultiCircuit
from VeraGridEngine.Utils.Symbolic.symbolic import (Var, Const, Expr, piecewise)
from VeraGridEngine.Utils.Symbolic.compiled_functions import SymbolicJacobian
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.Utils.Symbolic.symbolic_io import block_deep_copy
from VeraGridEngine.enumerations import VarPowerFlowReferenceType, RmsInitializationMethod, DeviceType
from VeraGridEngine.basic_structures import Vec, ObjVec, BoolVec, Logger
from VeraGridEngine.Simulations.PowerFlow.power_flow_driver import PowerFlowResults
from VeraGridEngine.Simulations.Rms.rms_options import RmsOptions
from VeraGridEngine.Simulations.Rms.initialization import init_explicit, init_pseudo_transient
from VeraGridEngine.Simulations.Rms.problems.rms_problem_template import RmsProblemTemplate
from VeraGridEngine.Devices.Events.rms_events_group import RmsEventsGroup
from VeraGridEngine.Devices.types import ALL_DEV_TYPES
from VeraGridEngine.Devices.Substation.bus import Bus
from VeraGridEngine.Utils.Symbolic.jit_compiler import RMSCompiler
from VeraGridEngine.Devices.Branches.transformer import Transformer2W
from VeraGridEngine.Simulations.driver_template import DummySignal
from VeraGridEngine.IO.fmu.importer.experimental_cs import (
    advance_rms_fmu_cs_devices,
    close_rms_fmu_cs_devices,
    initialize_rms_fmu_cs_devices,
    register_rms_fmu_cs_device,
)
from VeraGridEngine.IO.fmu.importer.experimental_me import (
    advance_rms_fmu_me_devices,
    close_rms_fmu_me_devices,
    initialize_rms_fmu_me_devices,
    register_rms_fmu_me_device,
)


def _tic():
    return time.perf_counter()


def _toc(t0):
    return time.perf_counter() - t0


[docs] def setP(P: ObjVec, P_used: BoolVec, k: int, val: object): """ Set or add to P value at index k. """ if not P_used[k]: P[k] = val P_used[k] = True else: P[k] += val
[docs] def setQ(Q: ObjVec, Q_used: BoolVec, k: int, val: object): """ Set or add to Q value at index k. """ if not Q_used[k]: Q[k] = val Q_used[k] = True else: Q[k] += val
[docs] def setIr(Ir: ObjVec, Ir_used: BoolVec, k: int, val: object): """ Set or add to Ir (real current) value at index k. """ if not Ir_used[k]: Ir[k] = val Ir_used[k] = True else: Ir[k] += val
[docs] def setIi(Ii: ObjVec, Ii_used: BoolVec, k: int, val: object): """ Set or add to Ii (imaginary current) value at index k. """ if not Ii_used[k]: Ii[k] = val Ii_used[k] = True else: Ii[k] += val
[docs] class RmsProblemPhasor(RmsProblemTemplate): """ Phasor-based DAE (Differential-Algebraic Equation) class. This class uses phasor representation (Vr, Vi) for voltages instead of polar coordinates (Vm, Va). The phasor representation makes current equations linear and is more suitable for certain analysis. Responsibilities: - Store state and algebraic variables (x, y) using phasor representation - Store Jacobian matrices - Store residual equations using phasor-based power flow equations - Store sparsity patterns - Convert between polar (power flow) and phasor (RMS) representations """ VARS_NAME = "vars" VARIABLE_PARAMS_NAME = "vprms" CONSTANT_PARAMS_NAME = "cprms" DIFF_NAME = "diff" TIME_NAME = "glob_time" def __init__(self, grid: MultiCircuit, options: RmsOptions, pf_results: PowerFlowResults | None, progress_signal: DummySignal | None = None): """ :param grid: :param options: :param pf_results: """ super().__init__() self.logger = Logger() self.progress_signal = progress_signal self.grid: MultiCircuit = grid self.power_flow_results: PowerFlowResults = pf_results self.Sf = self.power_flow_results.Sf / self.grid.Sbase self.St = self.power_flow_results.St / self.grid.Sbase self.options: RmsOptions = options # this is the general init guess that will contain all the variables init value self.init_guess: Dict[int, float] = dict() self.sys_block: Block = Block(children=[], in_vars=[]) self._algebraic_vars: List[Var] = list() self._algebraic_eqs: List[Expr] = list() self._state_vars: List[Var] = list() self._state_eqs: List[Expr] = list() self._diff_vars: List[Var] = list() self._variable_parameters: List[Var] = list() self._event_parameters_eqs: List[Expr | Const] = list() self._constant_parameters: List[Var] = list() self._parameters_values: List[Const] = list() self._fmu_cs_adapters: List[object] = list() self._fmu_cs_initialized: bool = False self._fmu_me_adapters: List[object] = list() self._fmu_me_initialized: bool = False # -------------------------------------------------------------------------------------------------------------- # Initialize the RMS problem # -------------------------------------------------------------------------------------------------------------- ######################### Initialize containers############################# total_init_explicit_time: float = 0 t0 = time.perf_counter() # dictionaries to store device-variable ifo self._vars_info: Dict[ALL_DEV_TYPES, List[Var]] = dict() self._vars_glob_name2uid: Dict[str, int] = dict() # dictionaries for compilation names self._compiler_names_dict: Dict[int, str] = dict() self._alias_names_dict: Dict[int, str] = dict() # dictionaries for variable position in the variables arrays self._uid2idx_vars: Dict[int, int] = dict() self._uid2idx_event_params: Dict[int, int] = dict() self._uid2idx_params: Dict[int, int] = dict() self._uid2idx_diff: Dict[int, int] = dict() self._uid2idx_t: Dict[int, int] = dict() # create time global time variable and add it to the compilation dict self._glob_time: Var = Var(self.TIME_NAME) self._compiler_names_dict[self._glob_time.uid] = self.TIME_NAME self._uid2idx_t[self._glob_time.uid] = 0 # Dictionary of state and algebraic vars self.sys_vars: Dict[int, Var] = dict() # initialize balance equation arrays n = len(self.grid.buses) P: ObjVec = np.zeros(n, dtype=object) Q: ObjVec = np.zeros(n, dtype=object) P_used: BoolVec = np.zeros(n, dtype=bool) Q_used: BoolVec = np.zeros(n, dtype=bool) # Phasor current balance arrays (using current instead of power) Ir: ObjVec = np.zeros(n, dtype=object) Ii: ObjVec = np.zeros(n, dtype=object) Ir_used: BoolVec = np.zeros(n, dtype=bool) Ii_used: BoolVec = np.zeros(n, dtype=bool) branch_bus_r = np.zeros(n, dtype=float) branch_bus_i = np.zeros(n, dtype=float) # general indexes for variables and parameters self._n_vars = 0 self._n_params = 0 self._n_event_params = 0 self._n_diff = 0 ######################################## Initialize devices ######################################## # initialize buses bus_dict: Dict[Bus, int] = dict() for bus_num, elm in enumerate(self.grid.buses): # Todo: missing default initialization for the model bus_dict[elm] = bus_num # flatten model elm.rms_model.unify_blocks() self.add_variables_to_compilation_dicts(elm, elm.rms_model) # add init values from powerflow to initial guess if elm.is_dc: # DC bus: use Vdc (magnitude) - angle is not applicable for DC self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Vdc, np.abs(self.power_flow_results.voltage[bus_num])) else: # AC bus: use Vm and Va self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Vi, np.imag(self.power_flow_results.voltage[bus_num])) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Vr, np.real(self.power_flow_results.voltage[bus_num])) # add model to system block self.sys_block.add(elm.rms_model) # initialize branches for branch_num, elm in enumerate(self.grid.get_branches_iter(add_vsc=False, add_hvdc=False, add_switch=True)): # Todo: missing default initialization for the model # flatten model if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: elm.rms_model.unify_blocks() # if not isinstance(elm, Transformer2W): active_factor = 1.0 if elm.active else 0.0 z2 = elm.R ** 2 + elm.X ** 2 g_val = active_factor * float(elm.R / z2) b_val = active_factor * float(-elm.X / z2) bsh_val = active_factor * float(elm.B) elm.rms_model.parameters[ elm.rms_model.api_obj_mapping[ParamPowerFlowReferenceType.g]] = Const(g_val) elm.rms_model.parameters[ elm.rms_model.api_obj_mapping[ParamPowerFlowReferenceType.b]] = Const(b_val) elm.rms_model.parameters[ elm.rms_model.api_obj_mapping[ParamPowerFlowReferenceType.bsh]] = Const(bsh_val) if ParamPowerFlowReferenceType.vtap_f in elm.rms_model.api_obj_mapping: elm.rms_model.parameters[elm.rms_model.api_obj_mapping[ParamPowerFlowReferenceType.vtap_f]] = Const(1.0) if ParamPowerFlowReferenceType.vtap_t in elm.rms_model.api_obj_mapping: elm.rms_model.parameters[elm.rms_model.api_obj_mapping[ParamPowerFlowReferenceType.vtap_t]] = Const( float(elm.bus_from.Vnom / elm.bus_to.Vnom) ) self.add_variables_to_compilation_dicts(elm, elm.rms_model) register_rms_fmu_cs_device(self, elm, elm.rms_model) register_rms_fmu_me_device(self, elm, elm.rms_model) # Convert power flow results (S = P + jQ) to currents (I = S* / V*) # I = (P - jQ) / (Vr - jVi) = (P*Vr + Q*Vi) / |V|^2 + j*(P*Vi - Q*Vr) / |V|^2 f_idx = bus_dict[elm.bus_from] t_idx = bus_dict[elm.bus_to] Vf = self.power_flow_results.voltage[f_idx] Vt = self.power_flow_results.voltage[t_idx] Sf = self.Sf[branch_num] St = self.St[branch_num] # From end current Vf_mag_sq = np.abs(Vf)**2 Irf = (Sf.real * Vf.real + Sf.imag * Vf.imag) / Vf_mag_sq Iif = (Sf.real * Vf.imag - Sf.imag * Vf.real) / Vf_mag_sq # To end current Vt_mag_sq = np.abs(Vt)**2 Irt = (St.real * Vt.real + St.imag * Vt.imag) / Vt_mag_sq Iit = (St.real * Vt.imag - St.imag * Vt.real) / Vt_mag_sq # add init values from powerflow to initial guess self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Irf, Irf) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Iif, Iif) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Irt, Irt) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Iit, Iit) branch_bus_r[f_idx] += Irf branch_bus_i[f_idx] += Iif branch_bus_r[t_idx] += Irt branch_bus_i[t_idx] += Iit # Run explicit initialization for branches to solve algebraic equations if isinstance(elm, Transformer2W): if self.options.initialization_method == RmsInitializationMethod.Explicit: init_explicit(mdl=elm.rms_model, sys_vars=self.sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs, constant_parameters=self._constant_parameters, init_guess=self.init_guess, uid2idx_vars=self.uid2idx_vars, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME ) # add model to system block self.sys_block.add(elm.rms_model) # add variable to conservation equations of the bus to which the element is connected f = bus_dict[elm.bus_from] t = bus_dict[elm.bus_to] # Phasor formulation: use current balance instead of power balance setIr(Ir, Ir_used, f, -elm.rms_model.E(VarPowerFlowReferenceType.Irf)) setIr(Ir, Ir_used, t, -elm.rms_model.E(VarPowerFlowReferenceType.Irt)) setIi(Ii, Ii_used, f, -elm.rms_model.E(VarPowerFlowReferenceType.Iif)) setIi(Ii, Ii_used, t, -elm.rms_model.E(VarPowerFlowReferenceType.Iit)) # Populating VSCs init guess for i, elm in enumerate(self.grid.get_vsc()): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: mdl = elm.rms_model # flatten model to collect all variables including those from child blocks mdl.unify_blocks() St_vsc = (self.power_flow_results.St_vsc)/ self.grid.Sbase Sf_vsc = (self.power_flow_results.Pfn_vsc[i] + self.power_flow_results.Pfp_vsc[i])/self.grid.Sbase # fill init_guess self.add_variables_to_compilation_dicts(elm, mdl) self.set_init_guess(mdl, VarPowerFlowReferenceType.Pf, Sf_vsc) self.set_init_guess(mdl, VarPowerFlowReferenceType.Pt, St_vsc[i].real) self.set_init_guess(mdl, VarPowerFlowReferenceType.Qt, St_vsc[i].imag) f = bus_dict[elm.bus_from] t = bus_dict[elm.bus_to] setP(P, P_used, f, -mdl.E(VarPowerFlowReferenceType.Pf)) setP(P, P_used, t, -mdl.E(VarPowerFlowReferenceType.Pt)) setQ(Q, Q_used, t, -mdl.E(VarPowerFlowReferenceType.Qt)) self.sys_block.add(mdl) # Populating HVDC init guess (similar to VSCs) for i, elm in enumerate(self.grid.get_hvdc()): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: mdl = elm.rms_model Sf_hvdc = self.power_flow_results.Pf_hvdc[i] / self.grid.Sbase St_hvdc = self.power_flow_results.Pt_hvdc[i] / self.grid.Sbase self.add_variables_to_compilation_dicts(elm, mdl) # Set initialization values for HVDC # Real number only (no Q values) # self.set_init_guess(mdl, VarPowerFlowReferenceType.Pf, Pf_hvdc) # self.set_init_guess(mdl, VarPowerFlowReferenceType.Pt, Pt_hvdc) f = bus_dict[elm.bus_from] t = bus_dict[elm.bus_to] setP(P, P_used, f, -mdl.E(VarPowerFlowReferenceType.Pf)) setP(P, P_used, t, -mdl.E(VarPowerFlowReferenceType.Pt)) setQ(Q, Q_used, f, -mdl.E(VarPowerFlowReferenceType.Qf)) setQ(Q, Q_used, t, -mdl.E(VarPowerFlowReferenceType.Qt)) self.sys_block.add(mdl) # initialize injections for elm in grid.get_vsc(): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: # flatten model - variables already registered in first VSC loop above elm.rms_model.unify_blocks() # find init values for the variables of this model if self.options.initialization_method == RmsInitializationMethod.Explicit: init_explicit(mdl=elm.rms_model, sys_vars=self.sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs, constant_parameters=self._constant_parameters, init_guess=self.init_guess, uid2idx_vars=self.uid2idx_vars, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME ) else: raise ValueError("Not implemented initialization method") # add model to system block self.sys_block.add(elm.rms_model) gen_idx_map = {elm.idtag: i for i, elm in enumerate(grid.generators)} batt_idx_map = {elm.idtag: i for i, elm in enumerate(grid.batteries)} shunt_idx_map = {elm.idtag: i for i, elm in enumerate(grid.shunts)} loads_idx_map = {elm.idtag: i for i, elm in enumerate(grid.loads)} def _s_to_i(S: complex, V: complex) -> tuple[float, float]: v2 = np.abs(V) ** 2 if v2 > 0: ir_val = (S.real * V.real + S.imag * V.imag) / v2 ii_val = (S.real * V.imag - S.imag * V.real) / v2 else: ir_val = 0.0 ii_val = 0.0 return ir_val, ii_val # Build per-bus residual current budget for load-current devices (Ir0/Ii0 templates) n_bus = len(self.grid.buses) fixed_inj_r = np.zeros(n_bus, dtype=float) fixed_inj_i = np.zeros(n_bus, dtype=float) adjustable_count = np.zeros(n_bus, dtype=int) slack_gen_count = np.zeros(n_bus, dtype=int) for dev in grid.get_injection_devices_iter(): if dev.rms_model.empty(): continue bidx = bus_dict[dev.bus] if dev.bus.is_dc: continue has_ir = VarPowerFlowReferenceType.Ir in dev.rms_model.external_mapping has_ii = VarPowerFlowReferenceType.Ii in dev.rms_model.external_mapping is_adjustable_load_current = ( has_ir and has_ii and ParamPowerFlowReferenceType.Ir0 in dev.rms_model.api_obj_mapping and ParamPowerFlowReferenceType.Ii0 in dev.rms_model.api_obj_mapping ) if is_adjustable_load_current: adjustable_count[bidx] += 1 continue if dev.idtag in gen_idx_map and dev.bus.is_slack: slack_gen_count[bidx] += 1 continue Vbus_pf = self.power_flow_results.voltage[bidx] if dev.idtag in gen_idx_map: gidx = gen_idx_map[dev.idtag] Sdev_pf = complex(float(dev.P), float(self.power_flow_results.gen_q[gidx])) / grid.Sbase elif dev.idtag in batt_idx_map: bdid = batt_idx_map[dev.idtag] P_b = float(dev.P) Sdev_pf = complex(P_b, float(self.power_flow_results.battery_q[bdid])) / grid.Sbase elif dev.idtag in shunt_idx_map: g_sh = float(dev.G) / grid.Sbase b_sh = float(dev.B) / grid.Sbase Sdev_pf = complex(g_sh * (abs(Vbus_pf) ** 2), -b_sh * (abs(Vbus_pf) ** 2)) elif dev.idtag in loads_idx_map: P_load = dev.P Q_load = dev.Q Sdev_pf = complex(-P_load, -Q_load) / grid.Sbase ir_pf, ii_pf = _s_to_i(Sdev_pf, Vbus_pf) fixed_inj_r[bidx] += ir_pf fixed_inj_i[bidx] += ii_pf residual_r = branch_bus_r - fixed_inj_r residual_i = branch_bus_i - fixed_inj_i remaining_adjustable = adjustable_count.copy() remaining_slack_gen = slack_gen_count.copy() for elm in grid.get_injection_devices_iter(): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: bus_index = bus_dict[elm.bus] if elm.device_type == DeviceType.ShuntDevice: elm.rms_model.parameters[elm.rms_model.api_obj_mapping[ParamPowerFlowReferenceType.g]] = Const(float(elm.G) / grid.Sbase) elm.rms_model.parameters[elm.rms_model.api_obj_mapping[ParamPowerFlowReferenceType.b]] = Const(float(elm.B) / grid.Sbase) elm.rms_model.unify_blocks() self.add_variables_to_compilation_dicts(elm, elm.rms_model) register_rms_fmu_cs_device(self, elm, elm.rms_model) register_rms_fmu_me_device(self, elm, elm.rms_model) if elm.bus.is_dc: # DC buses: use power directly if supported, otherwise skip Sbus_dc = self.power_flow_results.Sbus[bus_index] if VarPowerFlowReferenceType.P in elm.rms_model.external_mapping: self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.P, np.real(Sbus_dc)) if VarPowerFlowReferenceType.Ir in elm.rms_model.external_mapping: # Approximate: I = P/V (assuming V β‰ˆ 1) self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Ir, np.real(Sbus_dc)) else: # Phasor formulation: convert per-device power injection to current Vbus = self.power_flow_results.voltage[bus_index] has_ir = VarPowerFlowReferenceType.Ir in elm.rms_model.external_mapping has_ii = VarPowerFlowReferenceType.Ii in elm.rms_model.external_mapping is_adjustable_load_current = ( has_ir and has_ii and ParamPowerFlowReferenceType.Ir0 in elm.rms_model.api_obj_mapping and ParamPowerFlowReferenceType.Ii0 in elm.rms_model.api_obj_mapping ) if elm.idtag in gen_idx_map: if elm.bus.is_slack and remaining_slack_gen[bus_index] > 0: Ir_val = residual_r[bus_index] / remaining_slack_gen[bus_index] Ii_val = residual_i[bus_index] / remaining_slack_gen[bus_index] remaining_slack_gen[bus_index] -= 1 residual_r[bus_index] -= Ir_val residual_i[bus_index] -= Ii_val Sdev = complex( Vbus.real * Ir_val + Vbus.imag * Ii_val, Vbus.imag * Ir_val - Vbus.real * Ii_val ) else: gidx = gen_idx_map[elm.idtag] Sdev = complex(float(elm.P), float(self.power_flow_results.gen_q[gidx])) / grid.Sbase elif elm.idtag in batt_idx_map: bidx = batt_idx_map[elm.idtag] P_b = float(elm.P) Sdev = complex(P_b, float(self.power_flow_results.battery_q[bidx])) / grid.Sbase elif elm.idtag in shunt_idx_map: g_sh = float(elm.G) / grid.Sbase b_sh = float(elm.B) / grid.Sbase Sdev = complex(g_sh * (abs(Vbus) ** 2), -b_sh * (abs(Vbus) ** 2)) elif elm.idtag in loads_idx_map: P_load = elm.P Q_load = elm.Q Sdev = complex(-P_load, -Q_load) / grid.Sbase Ir_val, Ii_val = _s_to_i(Sdev, Vbus) print(f"Device {elm.name} at bus {elm.bus.name}: S={Sdev:.4f}, V={Vbus:.4f}, Ir={Ir_val:.4f}, Ii={Ii_val:.4f}") if VarPowerFlowReferenceType.Ir in elm.rms_model.external_mapping: self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Ir, Ir_val) if VarPowerFlowReferenceType.Ii in elm.rms_model.external_mapping: self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Ii, Ii_val) if VarPowerFlowReferenceType.P in elm.rms_model.external_mapping: self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.P, Sdev.real) if VarPowerFlowReferenceType.Q in elm.rms_model.external_mapping: self.set_init_guess(elm.rms_model, VarPowerFlowReferenceType.Q, Sdev.imag) k = bus_dict[elm.bus] # Phasor formulation: use current balance if VarPowerFlowReferenceType.Ir in elm.rms_model.external_mapping: setIr(Ir, Ir_used, k, elm.rms_model.E(VarPowerFlowReferenceType.Ir)) if VarPowerFlowReferenceType.Ii in elm.rms_model.external_mapping: setIi(Ii, Ii_used, k, elm.rms_model.E(VarPowerFlowReferenceType.Ii)) if self.options.initialization_method == RmsInitializationMethod.Explicit: init_explicit( mdl=elm.rms_model, sys_vars=self.sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs, constant_parameters=self._constant_parameters, init_guess=self.init_guess, uid2idx_vars=self.uid2idx_vars, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME ) elif self.options.initialization_method == RmsInitializationMethod.PseudoTransient: init_pseudo_transient( mdl=elm.rms_model, sys_vars=self.sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs, constant_parameters=self._constant_parameters, init_guess=self.init_guess, uid2idx_vars=self._uid2idx_vars, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME, dtau0=1e0, max_iter=1000, tol=1e-6 ) else: raise ValueError("Not implemented initialization method") self.sys_block.add(elm.rms_model) total_init_explicit_time += time.perf_counter() - t0 # print(f"\nTotal time explicit initialization: {total_init_explicit_time:.6f} seconds") self.logger.add_info("Total time explicit initialization", value=total_init_explicit_time) if self.progress_signal is not None: self.progress_signal.emit(10) # add the nodal balance equations (current balance for phasor formulation) for i, elm in enumerate(self.grid.buses): mdl = block_deep_copy(elm.rms_model, grid.var_factory) if len(mdl.algebraic_eqs) == 0: if not Ir_used[i] and not Ii_used[i]: self.logger.add_error("Isolated bus", value=i) else: # Phasor formulation uses current balance equations self._algebraic_eqs.append(Ir[i]) self._algebraic_eqs.append(Ii[i]) # self._variable_parameters: List[Var] = list() # self._event_parameters_eqs: List[Expr | Const] = list() # self._constant_parameters: List[Var] = list() # self._parameters_values: List[Const] = list() # for b in self.sys_block.get_all_blocks(): # # for param, eq in b.event_dict.items(): # self._variable_parameters.append(param) # self._event_parameters_eqs.append(eq) # for param, constant in b.parameters.items(): # self._constant_parameters.append(param) # self._parameters_values.append(constant) # We define the parameter dt and delta self._dt = Var(name='dt') self._delta = Var(name='delta') self._variable_parameters.append(self._dt) self._variable_parameters.append(self._delta) self._event_parameters_eqs.append(Const(1e-3)) self._event_parameters_eqs.append(Const(1)) # add these parameters, m is for variable parameters self._compiler_names_dict[self._dt.uid] = f"{self.VARIABLE_PARAMS_NAME}[{self._n_event_params}]" self._alias_names_dict[self._dt.uid] = f"{self.VARIABLE_PARAMS_NAME}_{self._n_event_params}" self._uid2idx_event_params[self._dt.uid] = self._n_event_params self._n_event_params += 1 self._compiler_names_dict[self._delta.uid] = f"{self.VARIABLE_PARAMS_NAME}[{self._n_event_params}]" self._alias_names_dict[self._delta.uid] = f"{self.VARIABLE_PARAMS_NAME}_{self._n_event_params}" self._uid2idx_event_params[self._delta.uid] = self._n_event_params self._n_event_params += 1 ##################### To be removed when order is preserved in the first part ############################# self._state_algeb_vars = list(self.sys_vars.values()) self._n_state = len(self._state_vars) self._n_alg = len(self._algebraic_vars) self._n_algebraic = len(self._algebraic_eqs) # self._compiler_names_dict: Dict[int, str] = dict() # self._alias_names_dict: Dict[int, str] = dict() self._uid2idx_vars: Dict[int, int] = dict() # self._uid2idx_event_params: Dict[int, int] = dict() # self._uid2idx_params: Dict[int, int] = dict() self._uid2idx_diff: Dict[int, int] = dict() self._uid2idx_t: Dict[int, int] = dict() #We put algebraic_vars that are actually states first self._algebraic_vars.sort(key=lambda obj: obj.diff_var is None) diff_vars_from_states = [var.diff_var for var in self._state_vars] for i, var in enumerate(diff_vars_from_states): state_var = self._state_vars[i] if var is None: diff_vars_from_states[i] = self.grid.var_factory.add_diff_var(name = 'aux', base_var=state_var) diff_vars_from_algebraic = [var.diff_var for var in self._algebraic_vars if var.diff_var is not None] self._diff_vars = diff_vars_from_states + diff_vars_from_algebraic i = 0 for v in self._state_vars: self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}" self._uid2idx_vars[v.uid] = i i += 1 for v in self._algebraic_vars: self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}" self._uid2idx_vars[v.uid] = i i += 1 # for j, ep in enumerate(self._constant_parameters): # self._compiler_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}[{j}]" # self._alias_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}_{j}" # self._uid2idx_params[ep.uid] = j # # for k, ep in enumerate(self._variable_parameters): # self._compiler_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}[{k}]" # self._alias_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}_{k}" # self._uid2idx_event_params[ep.uid] = k for k, ep in enumerate(self._diff_vars): self._compiler_names_dict[ep.uid] = f"{self.DIFF_NAME}[{k}]" self._alias_names_dict[ep.uid] = f"{self.DIFF_NAME}_{k}" self._uid2idx_diff[ep.uid] = k self._compiler_names_dict[self._glob_time.uid] = self.TIME_NAME self._alias_names_dict[self._glob_time.uid] = self.TIME_NAME self._uid2idx_t[self._glob_time.uid] = 0 for it, eq in enumerate(self._event_parameters_eqs): if isinstance(eq, Const) and eq.value is None: raise Exception(f' Event parameter {self._variable_parameters[it]} has None Value') # Store a copy of the original event parameters equations for later use with set_events_group self._event_parameters_eqs0 = self._event_parameters_eqs.copy() # add events modifying values of event_parameters equations collect_events = { param: {"times": [], "values": []} for param in self._variable_parameters } for rms_evt in self.grid.rms_events: collect_events[rms_evt.parameter]["times"].append(rms_evt.time) collect_events[rms_evt.parameter]["values"].append(rms_evt.value) # TODO: implement the function in block: apply_event for param, events_info in collect_events.items(): default_value = self._event_parameters_eqs[self._variable_parameters.index(param)] self._event_parameters_eqs[self._variable_parameters.index(param)] = piecewise( time_var=self._glob_time, t_events=np.array(events_info["times"]), new_values=np.array(events_info["values"]), default_value=default_value ) # -------------------------------------------------------------------------------------------------------------- # Compile RHS and Jacobian using JIT Compiler adaptation # -------------------------------------------------------------------------------------------------------------- timings = dict() print("Compiling RMS using JIT Native Compiler...") t0 = _tic() rms_compiler = RMSCompiler( variables=self._state_algeb_vars, diff_vars=self._diff_vars, v_params=self._variable_parameters, c_params=self._constant_parameters, dt_var=self._dt, compiler_names_dict=self._compiler_names_dict ) timings["Compiler Setup"] = _toc(t0) t0 = _tic() self._derivative_fn = rms_compiler.compile_derivative_fn(self._uid2idx_vars) timings["SymbolicDerivative"] = _toc(t0) t0 = _tic() self._event_params_fn = rms_compiler.compile_event_params_fn( eqs=self._event_parameters_eqs, alias_names_dict=self._alias_names_dict, EVENT_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, ) timings["Event parameters"] = _toc(t0) t0 = _tic() self._rhs_algeb_fn = rms_compiler.compile_rhs(self._algebraic_eqs, "rhs_algeb") timings["RHS algebraic"] = _toc(t0) if len(self._state_eqs) != 0: t0 = _tic() self._rhs_state_fn = rms_compiler.compile_rhs(self._state_eqs, "rhs_state") timings["RHS state"] = _toc(t0) t0 = _tic() self._j11_fn = rms_compiler.compile_sparse_jacobian(self._state_eqs, self._state_vars, "j11") timings["J11 (dF/dx)"] = _toc(t0) t0 = _tic() self._j12_fn = rms_compiler.compile_sparse_jacobian(self._state_eqs, self._algebraic_vars, "j12") timings["J12 (dF/dy)"] = _toc(t0) t0 = _tic() self._j21_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._state_vars, "j21") timings["J21 (dG/dx)"] = _toc(t0) t0 = _tic() self._j22_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._algebraic_vars, "j22") timings["J22 (dG/dy)"] = _toc(t0) else: t0 = _tic() self._j22_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._algebraic_vars, "j22") timings["J22 only (no states)"] = _toc(t0) if options.verbose > 0: print(f"Model compiled with {self._n_vars} variables") print("\nCompilation timing summary:") for k, v in timings.items(): print(f" {k:30s}: {v:8.4f} s") print(f"\nTotal JIT compile time: {sum(timings.values()):.4f} s") variable_parameters_init = np.ones(self.get_variable_parameter_number()) # TODO: think about this thing of calling twice here self._variable_parameters_values = self._event_params_fn(variable_parameters_init, 0.0) self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, 0.0) self._constant_params = np.array([const.value for const in self._parameters_values]) if options.verbose > 0: print(f"\nTotal compile time: {sum(timings.values()):.4f} s")
[docs] def add_variables_to_compilation_dicts(self, elm: ALL_DEV_TYPES, mdl: Block): """ add variables and parameters info to the system block :param elm: :type elm: Union[VeraGridEngine.Devices.Substation.bus.Bus, VeraGridEngine.Devices.Injections.load.Load] :param mdl: :type mdl: VeraGridEngine.Utils.Symbolic.block.Block :return: :rtype: None """ block_item: Block for block_item in mdl.get_all_blocks(): # i is for variables for v in block_item.state_vars: if v.uid in self._uid2idx_vars: raise ValueError(f"State variable '{v.name}' (uid={v.uid}) is already registered in the system. " f"Previous device may have created a duplicate variable.") self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{self._n_vars}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{self._n_vars}" self._uid2idx_vars[v.uid] = self._n_vars self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=block_item) self.add_device_var(dev=elm, var=v) self.sys_vars[v.uid] = v self._state_vars.append(v) self._n_vars += 1 for v in block_item.algebraic_vars: if v.uid in self._uid2idx_vars: raise ValueError(f"Algebraic variable '{v.name}' (uid={v.uid}) is already registered in the system. " f"Previous device may have created a duplicate variable.") self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{self._n_vars}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{self._n_vars}" self._uid2idx_vars[v.uid] = self._n_vars self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=block_item) self.add_device_var(dev=elm, var=v) self.sys_vars[v.uid] = v self._algebraic_vars.append(v) self._n_vars += 1 # j is for parameters ep: Var for ep, const in block_item.parameters.items(): if ep.uid in self._uid2idx_params: raise ValueError(f"Parameter '{ep.name}' (uid={ep.uid}) is already registered in the system. " f"Previous device may have created a duplicate parameter.") self._compiler_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}[{self._n_params}]" self._alias_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}_{self._n_params}" self._uid2idx_params[ep.uid] = self._n_params self._constant_parameters.append(ep) self._parameters_values.append(const) self._n_params += 1 # m is for variable parameters for ep, eq in block_item.event_dict.items(): k = len(self._variable_parameters) if ep.uid in self._uid2idx_event_params: raise ValueError(f"Event parameter '{ep.name}' (uid={ep.uid}) is already registered in the system. " f"Previous device may have created a duplicate event parameter.") self._compiler_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}[{k}]" self._alias_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}_{k}" self._uid2idx_event_params[ep.uid] = k self._variable_parameters.append(ep) self._event_parameters_eqs.append(eq) self._n_event_params += 1 # l is for differential vars for v in block_item.diff_vars: if v.uid in self._uid2idx_diff: raise ValueError(f"Differential variable '{v.name}' (uid={v.uid}) is already registered in the system. " f"Previous device may have created a duplicate differential variable.") self._compiler_names_dict[v.uid] = f"{self.DIFF_NAME}[{self._n_diff}]" self._alias_names_dict[v.uid] = f"{self.DIFF_NAME}_{self._n_diff}" self._uid2idx_diff[v.uid] = self._n_diff self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=block_item) self.add_device_var(dev=elm, var=v) self._diff_vars.append(v) self._n_diff += 1 self._state_eqs.extend(block_item.state_eqs) self._algebraic_eqs.extend(block_item.algebraic_eqs) if self.progress_signal is not None: self.progress_signal.emit(20)
[docs] def set_init_guess(self, mdl: Block, reference_powerflow: VarPowerFlowReferenceType, val: float): """ Add values from powerflow to initial guess. """ if reference_powerflow in mdl.external_mapping: var = mdl.external_mapping[reference_powerflow] self.init_guess[var.uid] = val print(f"DEBUG: set_init_guess {var.name} = {val} for var {var.name} (uid={var.uid})") else: print( f"DEBUG: set_init_guess {reference_powerflow.value} NOT FOUND in external_mapping. Available: {[k.value for k in mdl.external_mapping.keys()]}")
[docs] def get_init_guess_info(self) -> pd.DataFrame: """ Returns a df with uid, name, and initial value for the system variables. """ vars_names = list() for key, value in self.init_guess.items(): var_name = self.sys_vars[key].name vars_names.append((key, var_name, value)) return pd.DataFrame(data=vars_names, columns=["key", "var_name", "value"])
[docs] def get_E_matrix(self, x:Vec, dx:Vec): #We first find all diff_vars xdot = list() # for var in self._state_vars: # if var.diff_var is None: # diff_var = self.grid.var_factory.add_diff_var(name = '', base_var=var) # xdot.append(diff_var) # else: # xdot.append(var.diff_var) # for var in self._diff_vars: # if var not in xdot: # xdot.append(var) all_eqs = self._state_eqs + self._algebraic_eqs xdot = self._diff_vars E_call = SymbolicJacobian( eqs= all_eqs, variables=xdot, compiler_names_dict=self._compiler_names_dict, alias_names_dict= self._alias_names_dict, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, EVENT_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, PARAMS_NAME=self.CONSTANT_PARAMS_NAME, static=True ) n_states = self.get_states_number() vp = self._variable_parameters_values cp = self._constant_params n_vars = self._n_vars E_value = sp.lil_matrix((n_vars, n_vars), dtype=np.float64) E_partial = E_call(x, dx, vp, cp, h=0).tocsc() # Map each d(eq)/d(diff_var_j) column to the column of the diff_var base # variable in the global [state_vars + algebraic_vars] ordering. for j, dvar in enumerate(xdot): base_var = dvar.base_var col_idx = self._uid2idx_vars.get(base_var.uid, None) if col_idx is not None: E_value[:, col_idx] += E_partial[:, j] else: pass E_value[:n_states, :n_states] -= sp.eye(n_states, dtype=E_value.dtype, format="lil") return E_value.tocsc()
[docs] def get_static_state_matrix(self, x:Vec, dx:Vec): nx = self.get_states_number() if nx == 0: return self.get_j22(x, dx, 1e15).tocsc() fx = self.get_j11(x, dx, 1e10).tocsc() fy = self.get_j12(x, dx, 1e10).tocsc() gx = self.get_j21(x, dx, 1e10).tocsc() gy = self.get_j22(x, dx, 1e15).tocsc() return sp.bmat([[fx, fy], [gx, gy]], format="csc")
[docs] def get_device_vars_dict(self) -> Dict[ALL_DEV_TYPES, List[Var]]: """Get dictionary of device variables.""" return self._vars_info
[docs] def add_device_var(self, dev: ALL_DEV_TYPES, var: Var): """Associate a variable with a device.""" var_list = self._vars_info.get(dev, None) if var_list is None: self._vars_info[dev] = [var] else: var_list.append(var)
[docs] def get_var_idx(self, v: Var) -> int: """Get variable index.""" return self._uid2idx_vars[v.uid]
@property def vars_glob_name2uid(self): return self._vars_glob_name2uid def _register_global_var_name(self, name_key: str, uid: int, block: Block | None = None) -> None: prev_uid = self._vars_glob_name2uid.get(name_key) if prev_uid is None or prev_uid == uid: self._vars_glob_name2uid[name_key] = uid return block_tag = "" if block is not None: block_tag = f"::{block.name}#{block.uid}" disambiguated_key = f"{name_key}{block_tag}" if disambiguated_key == name_key or disambiguated_key in self._vars_glob_name2uid: disambiguated_key = f"{name_key} [{uid}]" self._vars_glob_name2uid[disambiguated_key] = uid @property def uid2idx_vars(self): return self._uid2idx_vars @property def algebraic_vars(self): return self._algebraic_vars @property def state_and_algebraic_vars(self) -> List[Var]: variables = list() for lst in [self._state_vars, self._algebraic_vars]: for var in lst: variables.append(var) return variables @property def state_vars(self): return self._state_vars @property def get_algebraic_eqs(self): return self._algebraic_eqs @property def get_state_eqs(self): return self._state_eqs
[docs] def get_all_vars_number(self) -> int: return self._n_vars
[docs] def get_diff_var_number(self) -> int: return len(self._diff_vars)
[docs] def get_algebraic_var_number(self) -> int: return len(self._algebraic_vars)
[docs] def get_states_number(self) -> int: return self._n_state
[docs] def get_variable_parameter_number(self) -> int: return len(self._variable_parameters)
[docs] def get_x0(self) -> Vec: """ Helper function to build the initial vector. """ x = np.zeros(len(self._state_vars) + len(self._algebraic_vars)) for uid, val in self.init_guess.items(): if uid in self._uid2idx_vars: i = self._uid2idx_vars[uid] x[i] = val return x
[docs] def get_next_forced_event_time(self, t_prev: float, t_target: float): """Phasor RMS currently has no forced sub-step events.""" return None
[docs] def update_variable_params(self, t: float, x_snapshot: Vec | None = None, scheduled_t: float | None = None): """Update the variable parameters.""" self._variable_parameters_values[:self._n_event_params] = self._event_params_fn(self._variable_parameters_values, t)
[docs] def reset_boundary_update_state(self, t0: float = 0.0) -> None: """Reset runtime event-parameter state to the initial simulation time.""" variable_parameters_init = np.ones(self.get_variable_parameter_number()) self._variable_parameters_values = self._event_params_fn(variable_parameters_init, float(t0)) self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, float(t0))
[docs] def update(self, t: float, x: Vec, params: Vec) -> None: """Store runtime event parameters for the current Newton step.""" self._variable_parameters_values[:] = params[:len(self._variable_parameters)]
[docs] def initialize_fmu_cs_devices(self, x_snapshot: Vec, t: float = 0.0) -> None: """ Initialize imported FMU Co-Simulation devices before the RMS time loop starts. :param x_snapshot: Initial accepted state vector. :param t: Initial simulation time. :return: None. """ if len(self._fmu_cs_adapters) > 0: initialize_rms_fmu_cs_devices(problem=self, x_snapshot=x_snapshot, time_value=t) else: self._fmu_cs_initialized = True
[docs] def advance_fmu_cs_devices(self, t: float, x_snapshot: Vec, h: float) -> None: """ Advance imported FMU Co-Simulation devices for one RMS communication step. :param t: Current simulation time. :param x_snapshot: Current accepted state vector. :param h: RMS communication step. :return: None. """ if len(self._fmu_cs_adapters) > 0: advance_rms_fmu_cs_devices(problem=self, time_value=t, x_snapshot=x_snapshot, step_size=h)
[docs] def close_fmu_cs_devices(self) -> None: """ Release imported FMU Co-Simulation devices after the RMS simulation ends. :return: None. """ if len(self._fmu_cs_adapters) > 0: close_rms_fmu_cs_devices(self)
[docs] def initialize_fmu_me_devices(self, x_snapshot: Vec, t: float = 0.0) -> None: """ Initialize imported FMU Model Exchange devices before the RMS time loop starts. :param x_snapshot: Initial accepted state vector. :param t: Initial simulation time. :return: None. """ if len(self._fmu_me_adapters) > 0: initialize_rms_fmu_me_devices(problem=self, x_snapshot=x_snapshot, time_value=t) else: self._fmu_me_initialized = True
[docs] def advance_fmu_me_devices(self, t: float, x_snapshot: Vec, h: float) -> None: """ Advance imported FMU Model Exchange devices for one RMS communication step. :param t: Current simulation time. :param x_snapshot: Current accepted state vector. :param h: RMS communication step. :return: None. """ if len(self._fmu_me_adapters) > 0: advance_rms_fmu_me_devices(problem=self, time_value=t, x_snapshot=x_snapshot, step_size=h)
[docs] def close_fmu_me_devices(self) -> None: """ Release imported FMU Model Exchange devices after the RMS simulation ends. :return: None. """ if len(self._fmu_me_adapters) > 0: close_rms_fmu_me_devices(self)
[docs] def get_dx(self, x: Vec, xn: Vec, dx: Vec, h: float) -> Vec: return self._derivative_fn(x, xn, dx, h)
[docs] def rhs_state(self, x: Vec, dx: Vec) -> Vec: return self._rhs_state_fn(x, dx, self._variable_parameters_values, self._constant_params)
[docs] def rhs_algebraic(self, x: Vec, dx: Vec) -> Vec: return self._rhs_algeb_fn(x, dx, self._variable_parameters_values, self._constant_params)
[docs] def get_j11(self, x: Vec, dx: Vec, h: float): return self._j11_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_j12(self, x: Vec, dx: Vec, h: float): return self._j12_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_j21(self, x: Vec, dx: Vec, h: float): return self._j21_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_j22(self, x: Vec, dx: Vec, h: float): return self._j22_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_dt(self): return self._dt
[docs] def get_dt_value(self): dt_value = self._variable_parameters_values[-2] return dt_value
[docs] def set_events_group(self, rms_events_group: RmsEventsGroup): """ Set the events group to use for this simulation. This filters events by group and recompiles the event parameters function. :param rms_events_group: The RmsEventsGroup to use """ # Restore original event parameters equations self._event_parameters_eqs = self._event_parameters_eqs0.copy() # Collect events for this group only collect_events = { param: {"times": [], "values": []} for param in self._variable_parameters } # Filter events by group rms_evts = [evt for evt in self.grid.rms_events if evt.group.idtag == rms_events_group.idtag] for rms_evt in rms_evts: collect_events[rms_evt.parameter]["times"].append(rms_evt.time) collect_events[rms_evt.parameter]["values"].append(rms_evt.value) # Apply piecewise functions for events for param, events_info in collect_events.items(): if events_info["times"]: # Only if there are events for this parameter default_value = self._event_parameters_eqs[self._variable_parameters.index(param)] self._event_parameters_eqs[self._variable_parameters.index(param)] = piecewise( time_var=self._glob_time, t_events=np.array(events_info["times"]), new_values=np.array(events_info["values"]), default_value=default_value ) # Recompile the event parameters function rms_compiler = RMSCompiler( variables=self._state_algeb_vars, diff_vars=self._diff_vars, v_params=self._variable_parameters, c_params=self._constant_parameters, dt_var=self._dt, compiler_names_dict=self._compiler_names_dict ) self._event_params_fn = rms_compiler.compile_event_params_fn( eqs=self._event_parameters_eqs, alias_names_dict=self._alias_names_dict, EVENT_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, ) # Reinitialize variable parameters values variable_parameters_init = np.ones(self.get_variable_parameter_number()) self._variable_parameters_values = self._event_params_fn(variable_parameters_init, 0.0) self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, 0.0)