Source code for VeraGridEngine.Simulations.Rms.problems.rms_problem_complex

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from typing import Dict, List, Tuple
import time
import numpy as np
import pandas as pd

from VeraGridEngine.enumerations import ParamPowerFlowReferenceType
from VeraGridEngine.Devices import MultiCircuit
from VeraGridEngine.Utils.Symbolic.symbolic import (Var, Const, Expr, piecewise)
from VeraGridEngine.Utils.Symbolic.compiled_functions import SymbolicParamsVector, SymbolicDerivative
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.Utils.Symbolic.symbolic_io import block_deep_copy
from VeraGridEngine.enumerations import VarPowerFlowReferenceType, RmsInitializationMethod, DeviceType
from VeraGridEngine.basic_structures import Vec, ObjVec, BoolVec, Logger
from VeraGridEngine.Simulations.PowerFlow.power_flow_driver import PowerFlowResults
from VeraGridEngine.Simulations.Rms.rms_options import RmsOptions
from VeraGridEngine.Simulations.Rms.initialization import init_explicit, init_pseudo_transient
from VeraGridEngine.Simulations.Rms.problems.rms_problem_template import RmsProblemTemplate
from VeraGridEngine.Devices.types import ALL_DEV_TYPES
from VeraGridEngine.Devices.Substation.bus import Bus
from VeraGridEngine.Utils.Symbolic.jit_compiler import RMSCompiler
from VeraGridEngine.Templates.Rms.bus_complex_rms_template import initialize_bus_complex_rms, polar_to_complex
from VeraGridEngine.Templates.Rms.line_complex_rms_template import get_line_complex_rms_template


def _tic():
    return time.perf_counter()


def _toc(t0):
    return time.perf_counter() - t0


[docs] def setP(P: ObjVec, P_used: BoolVec, k: int, val: object): """ Set or add to P value at index k. """ if not P_used[k]: P[k] = val P_used[k] = True else: P[k] += val
[docs] def setQ(Q: ObjVec, Q_used: BoolVec, k: int, val: object): """ Set or add to Q value at index k. """ if not Q_used[k]: Q[k] = val Q_used[k] = True else: Q[k] += val
[docs] class RmsProblemComplex(RmsProblemTemplate): """ Complex Phasor-based DAE (Differential-Algebraic Equation) class with MULTILINEAR equations. This class uses complex phasor representation with a single complex voltage variable per bus. The key innovation is TRULY MULTILINEAR current equations: I = Y * (Vf - Vt) [Linear in voltage!] where Y = g + j*b is the complex admittance. When expanded: Ir = g*(Vrf - Vrt) - b*(Vif - Vit) Ii = g*(Vif - Vit) + b*(Vrf - Vrt) These are truly linear - no products of voltage variables, no squares! The current balance at each node (Ξ£I = 0) provides linear conservation equations. Responsibilities: - Store state and algebraic variables using complex phasor representation - Store Jacobian matrices - Store residual equations using multilinear current-based equations - Store sparsity patterns - Convert between polar (power flow) and complex phasor representations """ VARS_NAME = "vars" VARIABLE_PARAMS_NAME = "vprms" CONSTANT_PARAMS_NAME = "cprms" DIFF_NAME = "diff" TIME_NAME = "glob_time" def __init__(self, grid: MultiCircuit, options: RmsOptions, pf_results: PowerFlowResults | None): """ Initialize the complex phasor-based RMS problem. :param grid: MultiCircuit grid object :param options: RMS simulation options :param pf_results: Power flow results for initialization """ super().__init__() self.logger = Logger() self.grid: MultiCircuit = grid self.power_flow_results: PowerFlowResults = pf_results self.Sf = self.power_flow_results.Sf / self.grid.Sbase self.St = self.power_flow_results.St / self.grid.Sbase self.options: RmsOptions = options # General init guess containing all variable initial values self.init_guess: Dict[int, float] = dict() self.sys_block: Block = Block(children=[], in_vars=[]) self._algebraic_vars: List[Var] = list() self._algebraic_eqs: List[Expr] = list() self._state_vars: List[Var] = list() self._state_eqs: List[Expr] = list() self._diff_vars: List[Var] = list() self._variable_parameters: List[Var] = list() self._event_parameters_eqs: List[Expr | Const] = list() self._constant_parameters: List[Var] = list() self._parameters_values: List[Const] = list() # -------------------------------------------------------------------------------------------------------------- # Initialize the RMS problem # -------------------------------------------------------------------------------------------------------------- ######################### Initialize containers ############################# # Dictionaries to store device-variable info self._vars_info: Dict[ALL_DEV_TYPES, List[Var]] = dict() self._vars_glob_name2uid: Dict[str, int] = dict() # Dictionaries for compilation names self._compiler_names_dict: Dict[int, str] = dict() self._alias_names_dict: Dict[int, str] = dict() # Dictionaries for variable position in the variables arrays self._uid2idx_vars: Dict[int, int] = dict() self._uid2idx_event_params: Dict[int, int] = dict() self._uid2idx_params: Dict[int, int] = dict() self._uid2idx_diff: Dict[int, int] = dict() self._uid2idx_t: Dict[int, int] = dict() # Create time global time variable and add it to the compilation dict self._glob_time: Var = Var(self.TIME_NAME) self._compiler_names_dict[self._glob_time.uid] = self.TIME_NAME self._uid2idx_t[self._glob_time.uid] = 0 # Dictionary of state and algebraic vars self.sys_vars: Dict[int, Var] = dict() # Initialize balance equation arrays n = len(self.grid.buses) P: ObjVec = np.zeros(n, dtype=object) Q: ObjVec = np.zeros(n, dtype=object) P_used: BoolVec = np.zeros(n, dtype=bool) Q_used: BoolVec = np.zeros(n, dtype=bool) # General indexes for variables and parameters self._n_vars = 0 self._n_params = 0 self._n_event_params = 0 self._n_diff = 0 ######################################## Initialize devices ######################################## # Initialize buses with complex phasor representation bus_dict: Dict[Bus, int] = dict() for bus_num, elm in enumerate(self.grid.buses): bus_dict[elm] = bus_num # Initialize complex phasor-based bus model initialize_bus_complex_rms(bus=elm, vf=grid.var_factory) # Flatten model elm.rms_model.unify_blocks() mdl = block_deep_copy(elm.rms_model, grid.var_factory) self.add_variables_to_compilation_dicts(elm, mdl) # Add init values from powerflow to initial guess # Convert polar (Vm, Va) to complex phasor if elm.is_dc: self.set_init_guess(mdl, VarPowerFlowReferenceType.Vdc, np.abs(self.power_flow_results.voltage[bus_num])) self.set_init_guess(mdl, VarPowerFlowReferenceType.P, float(np.real(self.power_flow_results.Sbus[bus_num] / self.grid.Sbase))) else: Vm = np.abs(self.power_flow_results.voltage[bus_num]) Va = np.angle(self.power_flow_results.voltage[bus_num]) V_complex = polar_to_complex(Vm, Va) Vr, Vi = V_complex.real, V_complex.imag self.set_init_guess(mdl, VarPowerFlowReferenceType.Vr, Vr) self.set_init_guess(mdl, VarPowerFlowReferenceType.Vi, Vi) self.set_init_guess(mdl, VarPowerFlowReferenceType.P, float(np.real(self.power_flow_results.Sbus[bus_num] / self.grid.Sbase))) self.set_init_guess(mdl, VarPowerFlowReferenceType.Q, float(np.imag(self.power_flow_results.Sbus[bus_num] / self.grid.Sbase))) # Add model to system block self.sys_block.add(mdl) # Initialize branches with complex multilinear line models line_idx = 0 for elm in self.grid.get_branches_iter(add_vsc=True, add_hvdc=True, add_switch=True): if elm.device_type == DeviceType.LineDevice: # Create complex multilinear line model from VeraGridEngine.Devices.Dynamic.dynamic_model_host import DynamicModelHost if elm.rms_model is None: elm.rms_model = DynamicModelHost() elm.rms_model = get_line_complex_rms_template( vfactory=grid.var_factory, name=f"Line_complex_{elm.name}" ).block # Connect line inputs to bus outputs (complex voltages) bus_from_model = elm.bus_from.rms_model bus_to_model = elm.bus_to.rms_model # Connect: line.in_vars[0:3] -> [Vrf, Vif, Vrt, Vit] grid.var_factory.add_connections( [elm.rms_model.in_vars[0]], [bus_from_model.out_vars[0]] # Vrf -> bus.Vr ) grid.var_factory.add_connections( [elm.rms_model.in_vars[1]], [bus_from_model.out_vars[1]] # Vif -> bus.Vi ) grid.var_factory.add_connections( [elm.rms_model.in_vars[2]], [bus_to_model.out_vars[0]] # Vrt -> bus.Vr ) grid.var_factory.add_connections( [elm.rms_model.in_vars[3]], [bus_to_model.out_vars[1]] # Vit -> bus.Vi ) elm.rms_model.unify_blocks() mdl = elm.rms_model # Set line parameters mdl.parameters[mdl.api_obj_mapping[ParamPowerFlowReferenceType.g]] = Const(float(elm.R / (elm.R ** 2 + elm.X ** 2))) mdl.parameters[mdl.api_obj_mapping[ParamPowerFlowReferenceType.b]] = Const(float(-elm.X / (elm.R ** 2 + elm.X ** 2))) mdl.parameters[mdl.api_obj_mapping[ParamPowerFlowReferenceType.bsh]] = Const(elm.B) self.add_variables_to_compilation_dicts(elm, mdl) # Add init values from powerflow to initial guess self.set_init_guess(mdl, VarPowerFlowReferenceType.Pf, self.Sf[line_idx].real) self.set_init_guess(mdl, VarPowerFlowReferenceType.Qf, self.Sf[line_idx].imag) self.set_init_guess(mdl, VarPowerFlowReferenceType.Pt, self.St[line_idx].real) self.set_init_guess(mdl, VarPowerFlowReferenceType.Qt, self.St[line_idx].imag) line_idx += 1 # Add model to system block self.sys_block.add(mdl) # Add variable to conservation equations of the bus f = bus_dict[elm.bus_from] t = bus_dict[elm.bus_to] setP(P, P_used, f, -mdl.E(VarPowerFlowReferenceType.Pf)) setP(P, P_used, t, -mdl.E(VarPowerFlowReferenceType.Pt)) if not elm.bus_from.is_dc: setQ(Q, Q_used, f, -mdl.E(VarPowerFlowReferenceType.Qf)) if not elm.bus_to.is_dc: setQ(Q, Q_used, t, -mdl.E(VarPowerFlowReferenceType.Qt)) elif elm.rms_model is None or elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) continue else: # Non-line branches (VSC, HVDC, etc.) - use existing model elm.rms_model.unify_blocks() mdl = elm.rms_model self.add_variables_to_compilation_dicts(elm, mdl) # Add init values from powerflow to initial guess if VarPowerFlowReferenceType.Pf in mdl.external_mapping: self.set_init_guess(mdl, VarPowerFlowReferenceType.Pf, self.Sf[line_idx].real if line_idx < len(self.Sf) else 0.0) if VarPowerFlowReferenceType.Qf in mdl.external_mapping: self.set_init_guess(mdl, VarPowerFlowReferenceType.Qf, self.Sf[line_idx].imag if line_idx < len(self.Sf) else 0.0) if VarPowerFlowReferenceType.Pt in mdl.external_mapping: self.set_init_guess(mdl, VarPowerFlowReferenceType.Pt, self.St[line_idx].real if line_idx < len(self.St) else 0.0) if VarPowerFlowReferenceType.Qt in mdl.external_mapping: self.set_init_guess(mdl, VarPowerFlowReferenceType.Qt, self.St[line_idx].imag if line_idx < len(self.St) else 0.0) line_idx += 1 # Add model to system block self.sys_block.add(mdl) # Add variable to conservation equations of the bus f = bus_dict[elm.bus_from] t = bus_dict[elm.bus_to] if VarPowerFlowReferenceType.Pf in mdl.external_mapping: setP(P, P_used, f, -mdl.E(VarPowerFlowReferenceType.Pf)) if VarPowerFlowReferenceType.Pt in mdl.external_mapping: setP(P, P_used, t, -mdl.E(VarPowerFlowReferenceType.Pt)) if not elm.bus_from.is_dc and VarPowerFlowReferenceType.Qf in mdl.external_mapping: setQ(Q, Q_used, f, -mdl.E(VarPowerFlowReferenceType.Qf)) if not elm.bus_to.is_dc and VarPowerFlowReferenceType.Qt in mdl.external_mapping: setQ(Q, Q_used, t, -mdl.E(VarPowerFlowReferenceType.Qt)) # Process VSCs for i, elm in enumerate(self.grid.get_vsc()): if elm.rms_model is None or elm.rms_model is None or elm.rms_model.empty(): self.logger.add_warning(f"VSC {elm.name} has no RMS model, skipping initialization") continue mdl = elm.rms_model mdl.unify_blocks() self.add_variables_to_compilation_dicts(elm, mdl) St_vsc = self.power_flow_results.St_vsc / self.grid.Sbase Sf_vsc = (self.power_flow_results.Pfn_vsc[i] + self.power_flow_results.Pfp_vsc[i]) / self.grid.Sbase # Fill init_guess if VarPowerFlowReferenceType.Pf in mdl.external_mapping: self.set_init_guess(mdl, VarPowerFlowReferenceType.Pf, Sf_vsc) if VarPowerFlowReferenceType.Pt in mdl.external_mapping: self.set_init_guess(mdl, VarPowerFlowReferenceType.Pt, St_vsc[i].real) if VarPowerFlowReferenceType.Qt in mdl.external_mapping: self.set_init_guess(mdl, VarPowerFlowReferenceType.Qt, St_vsc[i].imag) # Initialize Vdc from DC bus dc_bus_model = elm.bus_from.rms_model if VarPowerFlowReferenceType.Vdc in mdl.external_mapping and VarPowerFlowReferenceType.Vdc in dc_bus_model.external_mapping: dc_bus_vdc_var = dc_bus_model.external_mapping[VarPowerFlowReferenceType.Vdc] if dc_bus_vdc_var.uid in self.init_guess: self.set_init_guess(mdl, VarPowerFlowReferenceType.Vdc, self.init_guess[dc_bus_vdc_var.uid]) self.sys_block.add(mdl) # Process injections for elm in self.grid.get_injection_devices_iter(): if elm.rms_model.empty(): self.logger.add_error("No RMS model", device_class=elm.device_type.value, device=elm.name) else: bus_index = bus_dict[elm.bus] elm.rms_model.unify_blocks() mdl = block_deep_copy(elm.rms_model, self.grid.var_factory) self.add_variables_to_compilation_dicts(elm, mdl) # Fill init guess self.set_init_guess(mdl, VarPowerFlowReferenceType.P, np.real(self.power_flow_results.Sbus[bus_index] / self.grid.Sbase)) if not elm.bus.is_dc: self.set_init_guess(mdl, VarPowerFlowReferenceType.Q, np.imag(self.power_flow_results.Sbus[bus_index] / self.grid.Sbase)) # Add to conservation equations k = bus_dict[elm.bus] setP(P, P_used, k, mdl.E(VarPowerFlowReferenceType.P)) if not elm.bus.is_dc: setQ(Q, Q_used, k, mdl.E(VarPowerFlowReferenceType.Q)) # Find init values if self.options.initialization_method == RmsInitializationMethod.Explicit: init_explicit(mdl=mdl, sys_vars=self.sys_vars, variable_parameters=self._variable_parameters, event_parameters_eqs=self._event_parameters_eqs, constant_parameters=self._constant_parameters, init_guess=self.init_guess, uid2idx_vars=self.uid2idx_vars, uid2idx_params=self._uid2idx_params, uid2idx_event_params=self._uid2idx_event_params, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, VARS_NAME=self.VARS_NAME, DIFF_NAME=self.DIFF_NAME, CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME ) elif self.options.initialization_method == RmsInitializationMethod.PseudoTransient: self.logger.add_warning("PseudoTransient initialization not fully implemented") # Add events if mdl.event_dict is not None: collect_events = { key: {"times": [], "values": []} for key in mdl.event_dict.keys() } rms_evts = [rms_evt for rms_evt in self.grid.rms_events if rms_evt.device_idtag == elm.idtag] if len(rms_evts) != 0: for rms_evt in rms_evts: collect_events[rms_evt.parameter]["times"].append(rms_evt.time) collect_events[rms_evt.parameter]["values"].append(rms_evt.value) for param, events_info in collect_events.items(): default_value = elm.rms_model.event_dict[param] if isinstance(default_value, Const) and default_value.value is None: default_value = Const(0.0) elif default_value is None: default_value = Const(0.0) mdl.event_dict[param] = piecewise( time_var=self._glob_time, t_events=np.array(events_info["times"]), new_values=np.array(events_info["values"]), default_value=default_value ) self.sys_block.add(mdl) # Add the nodal balance equations for i, elm in enumerate(self.grid.buses): mdl = block_deep_copy(elm.rms_model, grid.var_factory) if len(mdl.algebraic_eqs) == 0: if not P_used[i] and not Q_used[i]: self.logger.add_error("Isolated bus", value=i) elif elm.is_dc: self._algebraic_eqs.append(P[i]) else: self._algebraic_eqs.append(P[i]) self._algebraic_eqs.append(Q[i]) # Rebuild compilation dicts self._rebuild_compilation_dicts() # Compile RHS and Jacobian self._compile_system()
[docs] def add_variables_to_compilation_dicts(self, elm: ALL_DEV_TYPES, mdl: Block): """ Add variables and parameters to the compilation dictionaries. :param elm: Device type to add variables to :param mdl: Block type to add variables to """ block_item: Block for block_item in mdl.get_all_blocks(): v: Var for v in block_item.state_vars: self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{self._n_vars}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{self._n_vars}" self._uid2idx_vars[v.uid] = self._n_vars self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=block_item) self.add_device_var(dev=elm, var=v) self.sys_vars[v.uid] = v self._state_vars.append(v) self._n_vars += 1 for v in block_item.algebraic_vars: self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{self._n_vars}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{self._n_vars}" self._uid2idx_vars[v.uid] = self._n_vars self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=block_item) self.add_device_var(dev=elm, var=v) self.sys_vars[v.uid] = v self._algebraic_vars.append(v) self._n_vars += 1 ep: Var for ep, const in block_item.parameters.items(): self._compiler_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}[{self._n_params}]" self._alias_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}_{self._n_params}" self._uid2idx_params[ep.uid] = self._n_params self._constant_parameters.append(ep) self._parameters_values.append(const) self._n_params += 1 for ep, eq in block_item.event_dict.items(): self._compiler_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}[{self._n_event_params}]" self._alias_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}_{self._n_event_params}" self._uid2idx_event_params[ep.uid] = self._n_event_params self._variable_parameters.append(ep) self._event_parameters_eqs.append(eq) self._n_event_params += 1 for v in block_item.diff_vars: self._compiler_names_dict[v.uid] = f"{self.DIFF_NAME}[{self._n_diff}]" self._alias_names_dict[v.uid] = f"{self.DIFF_NAME}_{self._n_diff}" self._uid2idx_diff[v.uid] = self._n_diff self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=block_item) self.add_device_var(dev=elm, var=v) self._diff_vars.append(v) self._n_diff += 1 self._state_eqs.extend(block_item.state_eqs) self._algebraic_eqs.extend(block_item.algebraic_eqs)
def _rebuild_compilation_dicts(self): """Rebuild compilation dictionaries after all models are added.""" self._state_algeb_vars = list(self.sys_vars.values()) self._n_state = len(self._state_vars) self._n_alg = len(self._algebraic_vars) self._n_algebraic = len(self._algebraic_eqs) self._compiler_names_dict: Dict[int, str] = dict() self._alias_names_dict: Dict[int, str] = dict() self._uid2idx_vars: Dict[int, int] = dict() self._uid2idx_event_params: Dict[int, int] = dict() self._uid2idx_params: Dict[int, int] = dict() self._uid2idx_diff: Dict[int, int] = dict() self._uid2idx_t: Dict[int, int] = dict() i = 0 for v in self._state_vars: self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}" self._uid2idx_vars[v.uid] = i i += 1 for v in self._algebraic_vars: self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]" self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}" self._uid2idx_vars[v.uid] = i i += 1 for j, ep in enumerate(self._constant_parameters): self._compiler_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}[{j}]" self._alias_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}_{j}" self._uid2idx_params[ep.uid] = j for k, ep in enumerate(self._variable_parameters): self._compiler_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}[{k}]" self._alias_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}_{k}" self._uid2idx_event_params[ep.uid] = k for k, ep in enumerate(self._diff_vars): self._compiler_names_dict[ep.uid] = f"{self.DIFF_NAME}[{k}]" self._alias_names_dict[ep.uid] = f"{self.DIFF_NAME}_{k}" self._uid2idx_diff[ep.uid] = k self._compiler_names_dict[self._glob_time.uid] = self.TIME_NAME self._uid2idx_t[self._glob_time.uid] = 0 for it, eq in enumerate(self._event_parameters_eqs): if isinstance(eq, Const) and eq.value is None: raise Exception(f' Event parameter {self._variable_parameters[it]} has None Value') # Substitute in algebraic eqs self._stability_eqs = list() for it, eq in enumerate(self._algebraic_eqs): for diff_var in self._diff_vars: eq = eq.subs({diff_var: Const(0)}) eq = eq.simplify() self._stability_eqs.append(eq) def _compile_system(self): """Compile RHS and Jacobian using JIT Compiler.""" timings = dict() print("Compiling Complex Phasor-based RMS using JIT Native Compiler...") t0 = _tic() self._derivative_fn = SymbolicDerivative( vars=self._state_algeb_vars, uid2idx_vars=self._uid2idx_vars, diff_vars=self._diff_vars, compiler_names_dict=self._compiler_names_dict ) timings["SymbolicDerivative"] = _toc(t0) t0 = _tic() self._event_params_fn = SymbolicParamsVector( eqs=self._event_parameters_eqs, compiler_names_dict=self._compiler_names_dict, alias_names_dict=self._alias_names_dict, EVENT_PARAMS_NAME=self.VARIABLE_PARAMS_NAME, TIME_NAME=self.TIME_NAME, ) timings["Event parameters"] = _toc(t0) t0 = _tic() rms_compiler = RMSCompiler( variables=self._state_algeb_vars, diff_vars=self._diff_vars, v_params=self._variable_parameters, c_params=self._constant_parameters, dt_var=self._variable_parameters[-2] if len(self._variable_parameters) >= 2 else Var("dt"), compiler_names_dict=self._compiler_names_dict ) timings["Compiler Setup"] = _toc(t0) t0 = _tic() self._rhs_algeb_fn = rms_compiler.compile_rhs(self._algebraic_eqs, "rhs_algeb") timings["RHS algebraic"] = _toc(t0) if len(self._state_eqs) != 0: t0 = _tic() self._rhs_state_fn = rms_compiler.compile_rhs(self._state_eqs, "rhs_state") timings["RHS state"] = _toc(t0) t0 = _tic() self._j11_fn = rms_compiler.compile_sparse_jacobian(self._state_eqs, self._state_vars, "j11") timings["J11 (dF/dx)"] = _toc(t0) t0 = _tic() self._j12_fn = rms_compiler.compile_sparse_jacobian(self._state_eqs, self._algebraic_vars, "j12") timings["J12 (dF/dy)"] = _toc(t0) t0 = _tic() self._j21_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._state_vars, "j21") timings["J21 (dG/dx)"] = _toc(t0) t0 = _tic() self._j22_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._algebraic_vars, "j22") timings["J22 (dG/dy)"] = _toc(t0) else: t0 = _tic() self._j22_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._algebraic_vars, "j22") timings["J22 only (no states)"] = _toc(t0) if self.options.verbose > 0: print(f"Complex Phasor Model compiled with {self._n_vars} variables") print("\nCompilation timing summary:") for k, v in timings.items(): print(f" {k:30s}: {v:8.4f} s") print(f"\nTotal JIT compile time: {sum(timings.values()):.4f} s") variable_parameters_init = np.ones(self.get_variable_parameter_number()) self._variable_parameters_values = self._event_params_fn(variable_parameters_init, 0.0) self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, 0.0) self._constant_params = np.array([const.value for const in self._parameters_values]) if self.options.verbose > 0: print(f"\nTotal compile time: {sum(timings.values()):.4f} s")
[docs] def set_init_guess(self, mdl: Block, reference_powerflow: VarPowerFlowReferenceType, val: float): """Add values from powerflow to initial guess.""" var = mdl.external_mapping[reference_powerflow] self.init_guess[var.uid] = val
[docs] def get_init_guess_info(self) -> pd.DataFrame: """Returns a df with uid, name, and initial value for the system variables.""" vars_names = list() for key, value in self.init_guess.items(): var_name = self.sys_vars[key].name vars_names.append((key, var_name, value)) return pd.DataFrame(data=vars_names, columns=["key", "var_name", "value"])
[docs] def get_device_vars_dict(self) -> Dict[ALL_DEV_TYPES, List[Var]]: """Get dictionary of device variables.""" return self._vars_info
[docs] def add_device_var(self, dev: ALL_DEV_TYPES, var: Var): """Associate a variable with a device.""" var_list = self._vars_info.get(dev, None) if var_list is None: self._vars_info[dev] = [var] else: var_list.append(var)
[docs] def get_var_idx(self, v: Var) -> int: """Get variable index.""" return self._uid2idx_vars[v.uid]
@property def vars_glob_name2uid(self): return self._vars_glob_name2uid def _register_global_var_name(self, name_key: str, uid: int, block: Block | None = None) -> None: prev_uid = self._vars_glob_name2uid.get(name_key) if prev_uid is None or prev_uid == uid: self._vars_glob_name2uid[name_key] = uid return block_tag = "" if block is not None: block_tag = f"::{block.name}#{block.uid}" disambiguated_key = f"{name_key}{block_tag}" if disambiguated_key == name_key: disambiguated_key = f"{name_key} [{uid}]" if disambiguated_key in self._vars_glob_name2uid and self._vars_glob_name2uid[disambiguated_key] != uid: raise ValueError( f"Global variable name collision for '{name_key}' and fallback '{disambiguated_key}': " f"existing uid={self._vars_glob_name2uid[disambiguated_key]}, new uid={uid}." ) self._vars_glob_name2uid[disambiguated_key] = uid @property def uid2idx_vars(self): return self._uid2idx_vars @property def algebraic_vars(self): return self._algebraic_vars @property def state_and_algebraic_vars(self) -> List[Var]: variables = list() for lst in [self._state_vars, self._algebraic_vars]: for var in lst: variables.append(var) return variables @property def state_vars(self): return self._state_vars
[docs] def get_all_vars_number(self) -> int: return self._n_vars
[docs] def get_diff_var_number(self) -> int: return len(self._diff_vars)
[docs] def get_algebraic_var_number(self) -> int: return len(self._algebraic_vars)
[docs] def get_states_number(self) -> int: return self._n_state
[docs] def get_variable_parameter_number(self) -> int: return len(self._variable_parameters)
[docs] def get_x0(self) -> Vec: """Helper function to build the initial vector.""" x = np.zeros(len(self._state_vars) + len(self._algebraic_vars)) for uid, val in self.init_guess.items(): if uid in self._uid2idx_vars: i = self._uid2idx_vars[uid] x[i] = val return x
[docs] def update_variable_params(self, t: float, x_snapshot: Vec | None = None): """Update the variable parameters.""" self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, t)
[docs] def get_dx(self, x: Vec, xn: Vec, dx: Vec, h: float) -> Vec: return self._derivative_fn(x, xn, dx, h)
[docs] def rhs_state(self, x: Vec, dx: Vec) -> Vec: return self._rhs_state_fn(x, dx, self._variable_parameters_values, self._constant_params)
[docs] def rhs_algebraic(self, x: Vec, dx: Vec) -> Vec: return self._rhs_algeb_fn(x, dx, self._variable_parameters_values, self._constant_params)
[docs] def get_j11(self, x: Vec, dx: Vec, h: float): return self._j11_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_j12(self, x: Vec, dx: Vec, h: float): return self._j12_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_j21(self, x: Vec, dx: Vec, h: float): return self._j21_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_j22(self, x: Vec, dx: Vec, h: float): return self._j22_fn(x, dx, self._variable_parameters_values, self._constant_params, h)
[docs] def get_dt(self): return self._variable_parameters[-2] if len(self._variable_parameters) >= 2 else Var("dt")
[docs] def get_dt_value(self): dt_value = self._variable_parameters_values[-2] if len(self._variable_parameters_values) >= 2 else 0.001 return dt_value