# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from typing import Dict, List
import time
import numpy as np
import pandas as pd
from VeraGridEngine.Devices import MultiCircuit
from VeraGridEngine.Simulations.driver_template import DummySignal
from VeraGridEngine.Utils.Symbolic.symbolic import (Var, Const, Expr, piecewise)
from VeraGridEngine.Utils.Symbolic.compiled_functions import SymbolicParamsVector, \
SymbolicDerivative
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.enumerations import VarPowerFlowReferenceType, RmsInitializationMethod
from VeraGridEngine.basic_structures import Vec, ObjVec, BoolVec, Logger
from VeraGridEngine.Simulations.PowerFlow.power_flow_driver import PowerFlowResults
from VeraGridEngine.Simulations.Rms.rms_options import RmsOptions
from VeraGridEngine.Simulations.Rms.initialization import init_explicit, init_pseudo_transient
from VeraGridEngine.Simulations.Rms.problems.rms_problem_template import RmsProblemTemplate
from VeraGridEngine.Devices.Events.rms_events_group import RmsEventsGroup
from VeraGridEngine.Devices.types import ALL_DEV_TYPES
from VeraGridEngine.Utils.Symbolic.jit_compiler import RMSCompiler
def _tic():
return time.perf_counter()
def _toc(t0):
return time.perf_counter() - t0
[docs]
def setP(P: ObjVec, P_used: BoolVec, k: int, val: object):
"""
:param P:
:param P_used:
:param k:
:param val:
:return:
"""
if not P_used[k]:
P[k] = val
P_used[k] = True
else:
P[k] += val
[docs]
def setQ(Q: ObjVec, Q_used: BoolVec, k: int, val: object):
"""
:param Q:
:param Q_used:
:param k:
:param val:
:return:
"""
if not Q_used[k]:
Q[k] = val
Q_used[k] = True
else:
Q[k] += val
[docs]
class RmsProblemTensygrid(RmsProblemTemplate):
"""
DAE (Differential-Algebraic Equation) class to store and manage.
Responsibilities:
- Store state and algebraic variables (x, y)
- Store Jacobian matrices
- Store residual equations
- Store sparsity patterns
"""
VARS_NAME = "vars"
VARIABLE_PARAMS_NAME = "vprms"
CONSTANT_PARAMS_NAME = "cprms"
DIFF_NAME = "diff"
TIME_NAME = "glob_time"
def __init__(self,
grid: MultiCircuit,
options: RmsOptions,
pf_results: PowerFlowResults | None,
progress_signal: DummySignal | None = None):
"""
:param grid:
:param options:
:param pf_results:
"""
super().__init__()
self.logger = Logger()
self.progress_signal = progress_signal
self.grid: MultiCircuit = grid
self.power_flow_results: PowerFlowResults = pf_results
self.Sf = self.power_flow_results.Sf / self.grid.Sbase
self.St = self.power_flow_results.St / self.grid.Sbase
self.options: RmsOptions = options
# this is the general init guess that will contain all the variables init value
self.init_guess: Dict[int, float] = dict()
self.sys_block: Block = Block(children=[], in_vars=[])
self._algebraic_vars: List[Var] = list()
self._algebraic_eqs: List[Expr] = list()
self._state_vars: List[Var] = list()
self._state_eqs: List[Expr] = list()
self._diff_vars: List[Var] = list()
self._variable_parameters: List[Var] = list()
self._event_parameters_eqs0: List[Expr | Const] = list()
self._event_parameters_eqs: List[Expr | Const] = list()
self._constant_parameters: List[Var] = list()
self._parameters_values: List[Const] = list()
self._reformulated_vars: List[Var] = list()
self._reformulated_lagvars: List[Var] = list()
# --------------------------------------------------------------------------------------------------------------
# Initialize the RMS problem
# --------------------------------------------------------------------------------------------------------------
######################### Initialize containers#############################
total_init_explicit_time: float = 0
t0 = time.perf_counter()
# dictionaries to store device-variable ifo
self._vars_info: Dict[ALL_DEV_TYPES, List[Var]] = dict()
self._vars_glob_name2uid: Dict[str, int] = dict()
# dictionaries for compilation names
self._compiler_names_dict: Dict[int, str] = dict()
self._alias_names_dict: Dict[int, str] = dict()
# dictionaries for variable position in the variables arrays
self._uid2idx_vars: Dict[int, int] = dict()
self._uid2idx_event_params: Dict[int, int] = dict()
self._uid2idx_params: Dict[int, int] = dict()
self._uid2idx_diff: Dict[int, int] = dict()
self._uid2idx_t: Dict[int, int] = dict()
# create time global time variable and add it to the compilation dict
self._glob_time: Var = Var(self.TIME_NAME)
self._compiler_names_dict[self._glob_time.uid] = self.TIME_NAME
self._uid2idx_t[self._glob_time.uid] = 0
# Dictionary of state and algebraic vars
self.sys_vars: Dict[int, Var] = dict()
# already computed grid power flow
bus_dict = dict()
# initialize balance equation arrays
n = len(self.grid.buses)
P: ObjVec = np.zeros(n, dtype=object)
Q: ObjVec = np.zeros(n, dtype=object)
P_used: BoolVec = np.zeros(n, dtype=bool)
Q_used: BoolVec = np.zeros(n, dtype=bool)
# general indexes for variables and parameters
self._n_vars = 0
self._n_params = 0
self._n_event_params = 0
self._n_diff = 0
######################################## Initialize devices ########################################
# initialize buses
for bus_num, elm in enumerate(self.grid.buses):
# Todo: missing default initialization for the model
bus_dict[elm] = bus_num
# flatten model
elm.rms_model.unify_blocks()
mdl = (elm.rms_model)
self.add_variables_to_compilation_dicts(elm, mdl)
# add init values from powerflow to initial guess
self.set_init_guess(mdl, VarPowerFlowReferenceType.Vm, np.abs(self.power_flow_results.voltage[bus_num]))
self.set_init_guess(mdl, VarPowerFlowReferenceType.Va, np.angle(self.power_flow_results.voltage[bus_num]))
# add model to system block
self.sys_block.add(mdl)
# initialize branches
for branch_num, elm in enumerate(self.grid.get_branches_iter(add_vsc=True, add_hvdc=True, add_switch=True)):
# Todo: missing default initialization for the model
# flatten model
elm.rms_model.unify_blocks()
mdl = (elm.rms_model)
self.add_variables_to_compilation_dicts(elm, mdl)
# add init values from powerflow to initial guess
self.set_init_guess(mdl, VarPowerFlowReferenceType.Pf, self.Sf[branch_num].real)
self.set_init_guess(mdl, VarPowerFlowReferenceType.Qf, self.Sf[branch_num].imag)
self.set_init_guess(mdl, VarPowerFlowReferenceType.Pt, self.St[branch_num].real)
self.set_init_guess(mdl, VarPowerFlowReferenceType.Qt, self.St[branch_num].imag)
# add model to system block
self.sys_block.add(mdl)
# add variable to conservation equations of the bus to which the element is connected
f = bus_dict[elm.bus_from]
t = bus_dict[elm.bus_to]
setP(P, P_used, f, -mdl.E(VarPowerFlowReferenceType.Pf))
setP(P, P_used, t, -mdl.E(VarPowerFlowReferenceType.Pt))
setQ(Q, Q_used, f, -mdl.E(VarPowerFlowReferenceType.Qf))
setQ(Q, Q_used, t, -mdl.E(VarPowerFlowReferenceType.Qt))
# initialize injections
bus_regions_dict = self.grid.get_injection_devices_grouped_by_bus()
for bus, region in bus_regions_dict.items():
bus_index = bus_dict[bus]
for dev_type, dev_list in region.items():
for elm in dev_list:
# flatten model
elm.rms_model.unify_blocks()
mdl = (elm.rms_model)
self.add_variables_to_compilation_dicts(elm, mdl)
# fill general init guess for known variables values
self.set_init_guess(mdl, VarPowerFlowReferenceType.P,
np.real(self.power_flow_results.Sbus[bus_index] / grid.Sbase))
self.set_init_guess(mdl, VarPowerFlowReferenceType.Q,
np.imag(self.power_flow_results.Sbus[bus_index] / grid.Sbase))
# add variable to conservation equations of the bus to which the element is connected
k = bus_dict[elm.bus]
setP(P, P_used, k, mdl.E(VarPowerFlowReferenceType.P))
setQ(Q, Q_used, k, mdl.E(VarPowerFlowReferenceType.Q))
# find init values for the variables of this model
if self.options.initialization_method == RmsInitializationMethod.Explicit:
init_explicit(mdl=mdl,
sys_vars=self.sys_vars,
variable_parameters=self._variable_parameters,
event_parameters_eqs=self._event_parameters_eqs,
constant_parameters=self._constant_parameters,
init_guess=self.init_guess,
uid2idx_vars=self.uid2idx_vars,
uid2idx_params=self._uid2idx_params,
uid2idx_event_params=self._uid2idx_event_params,
compiler_names_dict=self._compiler_names_dict,
alias_names_dict=self._alias_names_dict,
VARIABLE_PARAMS_NAME=self.VARIABLE_PARAMS_NAME,
TIME_NAME=self.TIME_NAME,
VARS_NAME=self.VARS_NAME,
DIFF_NAME=self.DIFF_NAME,
CONSTANT_PARAMS_NAME=self.CONSTANT_PARAMS_NAME
)
elif self.options.initialization_method == RmsInitializationMethod.PseudoTransient:
self._glob_time, sys_block_region, init_guess_region = init_pseudo_transient()
else:
raise ValueError("Not implemented initialization method")
self.sys_block.add(mdl)
total_init_explicit_time += time.perf_counter() - t0
# print(f"\nTotal time explicit initialization: {total_init_explicit_time:.6f} seconds")
self.logger.add_info("Total time explicit initialization", value=total_init_explicit_time)
if self.progress_signal is not None:
self.progress_signal.emit(10)
# add the nodal balance equations
for i, elm in enumerate(self.grid.buses):
mdl = (elm.rms_model)
if len(mdl.algebraic_eqs) == 0:
if not P_used[i] and not Q_used[i]:
self.logger.add_error("Isolated bus", value=i)
else:
self._algebraic_eqs.append(P[i])
self._algebraic_eqs.append(Q[i])
# self._variable_parameters: List[Var] = list()
# self._event_parameters_eqs: List[Expr | Const] = list()
# self._constant_parameters: List[Var] = list()
# self._parameters_values: List[Const] = list()
#
# for b in self.sys_block.get_all_blocks():
#
# for param, eq in b.event_dict.items():
# self._variable_parameters.append(param)
# self._event_parameters_eqs.append(eq)
# for param, constant in b.parameters.items():
# self._constant_parameters.append(param)
# self._parameters_values.append(constant)
# Collect reformulated_vars (tensygrid-specific)
for b in self.sys_block.get_all_blocks():
for var in b.reformulated_vars:
self._reformulated_vars.append(var)
# We define the parameter dt and delta
self._dt = Var(name='dt')
self._delta = Var(name='delta')
self._variable_parameters.append(self._dt)
self._variable_parameters.append(self._delta)
self._event_parameters_eqs.append(Const(1e-3))
self._event_parameters_eqs.append(Const(1))
# add these parameters, m is for variable parameters
self._compiler_names_dict[self._dt.uid] = f"{self.VARIABLE_PARAMS_NAME}[{self._n_event_params}]"
self._alias_names_dict[self._dt.uid] = f"{self.VARIABLE_PARAMS_NAME}_{self._n_event_params}"
self._uid2idx_event_params[self._dt.uid] = self._n_event_params
self._n_event_params += 1
self._compiler_names_dict[self._delta.uid] = f"{self.VARIABLE_PARAMS_NAME}[{self._n_event_params}]"
self._alias_names_dict[self._delta.uid] = f"{self.VARIABLE_PARAMS_NAME}_{self._n_event_params}"
self._uid2idx_event_params[self._delta.uid] = self._n_event_params
self._n_event_params += 1
##################### To be removed when order is preserved in the first part #############################
self._state_algeb_vars = list(self.sys_vars.values())
self._n_state = len(self._state_vars)
self._n_alg = len(self._algebraic_vars)
self._n_algebraic = len(self._algebraic_eqs)
# self._compiler_names_dict: Dict[int, str] = dict()
# self._alias_names_dict: Dict[int, str] = dict()
self._uid2idx_vars: Dict[int, int] = dict()
# self._uid2idx_event_params: Dict[int, int] = dict()
# self._uid2idx_params: Dict[int, int] = dict()
self._uid2idx_diff: Dict[int, int] = dict()
self._uid2idx_t: Dict[int, int] = dict()
i = 0
for v in self._state_vars:
self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]"
self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}"
self._uid2idx_vars[v.uid] = i
i += 1
for v in self._algebraic_vars:
self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]"
self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}"
self._uid2idx_vars[v.uid] = i
i += 1
# for j, ep in enumerate(self._constant_parameters):
# self._compiler_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}[{j}]"
# self._alias_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}_{j}"
# self._uid2idx_params[ep.uid] = j
for var in self._reformulated_vars:
fixed_var = Var('ref_var' + str(i))
self._reformulated_lagvars.append(fixed_var)
self._event_parameters_eqs.append(Const(1))
self._variable_parameters.append(fixed_var)
self._compiler_names_dict[fixed_var.uid] = f"{self.VARIABLE_PARAMS_NAME}[{i}]"
self._alias_names_dict[fixed_var.uid] = f"{self.VARIABLE_PARAMS_NAME}_{i}"
self._uid2idx_event_params[fixed_var.uid] = i
i += 1
# Re-index variable parameters to ensure sequential indices after all additions
for idx, ep in enumerate(self._variable_parameters):
self._compiler_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}[{idx}]"
self._alias_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}_{idx}"
self._uid2idx_event_params[ep.uid] = idx
for k, ep in enumerate(self._diff_vars):
self._compiler_names_dict[ep.uid] = f"{self.DIFF_NAME}[{k}]"
self._alias_names_dict[ep.uid] = f"{self.DIFF_NAME}_{k}"
self._uid2idx_diff[ep.uid] = k
self._compiler_names_dict[self._glob_time.uid] = self.TIME_NAME
self._uid2idx_t[self._glob_time.uid] = 0
for it, eq in enumerate(self._event_parameters_eqs):
if isinstance(eq, Const) and eq.value is None:
raise Exception(f' Event parameter {self._variable_parameters[it]} has None Value')
# Substitute in algebraic eqs
self._stability_eqs = list()
# fill stability equations by putting algebraic equations equal zero
for it, eq in enumerate(self._algebraic_eqs):
for diff_var in self._diff_vars:
eq = eq.subs({diff_var: Const(0)})
eq = eq.simplify()
self._stability_eqs.append(eq)
# Store a copy of the original event parameters equations for later use with set_events_group
self._event_parameters_eqs0 = self._event_parameters_eqs.copy()
#We populate the reformulated vars
for it, eq in enumerate(self._algebraic_eqs):
for j, var in enumerate(self._reformulated_vars):
is_diff = any(v.base_var is not None for v in eq.get_vars())
if var in eq.get_vars() and is_diff:
var_lagged= self._reformulated_lagvars[j]
eq = eq.subs({var: 0.5*(var + var_lagged)})
eq = eq.simplify()
self._algebraic_eqs[it] = eq
for it, eq in enumerate(self._state_eqs):
for j, var in enumerate(self._reformulated_vars):
is_diff = any(v.base_var is not None for v in eq.get_vars())
if var in eq.get_vars() and is_diff:
var_lagged= self._reformulated_lagvars[j]
eq = eq.subs({var: 0.5*(var + var_lagged)})
eq = eq.simplify()
self._state_eqs[it] = eq
# add events modifying values of event_parameters equations
collect_events = {
param: {"times": [], "values": []}
for param in self._variable_parameters
}
for rms_evt in self.grid.rms_events:
collect_events[rms_evt.parameter]["times"].append(rms_evt.time)
collect_events[rms_evt.parameter]["values"].append(rms_evt.value)
# TODO: implement the function in block: apply_event
for param, events_info in collect_events.items():
default_value = self._event_parameters_eqs[self._variable_parameters.index(param)]
self._event_parameters_eqs[self._variable_parameters.index(param)] = piecewise(
time_var=self._glob_time,
t_events=np.array(events_info["times"]),
new_values=np.array(events_info["values"]),
default_value=default_value
)
# --------------------------------------------------------------------------------------------------------------
# Compile RHS and Jacobian using JIT Compiler adaptation
# --------------------------------------------------------------------------------------------------------------
timings = dict()
print("Compiling RMS using JIT Native Compiler...")
t0 = _tic()
self._derivative_fn = SymbolicDerivative(
vars=self._state_algeb_vars,
uid2idx_vars=self._uid2idx_vars,
diff_vars=self._diff_vars,
compiler_names_dict=self._compiler_names_dict
)
timings["SymbolicDerivative"] = _toc(t0)
t0 = _tic()
self._event_params_fn = SymbolicParamsVector(
eqs=self._event_parameters_eqs,
compiler_names_dict=self._compiler_names_dict,
alias_names_dict=self._alias_names_dict,
EVENT_PARAMS_NAME=self.VARIABLE_PARAMS_NAME,
TIME_NAME=self.TIME_NAME,
)
timings["Event parameters"] = _toc(t0)
t0 = _tic()
rms_compiler = RMSCompiler(
variables=self._state_algeb_vars,
diff_vars=self._diff_vars,
v_params=self._variable_parameters,
c_params=self._constant_parameters,
dt_var=self._dt,
compiler_names_dict=self._compiler_names_dict
)
timings["Compiler Setup"] = _toc(t0)
t0 = _tic()
self._rhs_algeb_fn = rms_compiler.compile_rhs(self._algebraic_eqs, "rhs_algeb")
timings["RHS algebraic"] = _toc(t0)
if len(self._state_eqs) != 0:
t0 = _tic()
self._rhs_state_fn = rms_compiler.compile_rhs(self._state_eqs, "rhs_state")
timings["RHS state"] = _toc(t0)
t0 = _tic()
self._j11_fn = rms_compiler.compile_sparse_jacobian(self._state_eqs, self._state_vars, "j11")
timings["J11 (dF/dx)"] = _toc(t0)
t0 = _tic()
self._j12_fn = rms_compiler.compile_sparse_jacobian(self._state_eqs, self._algebraic_vars, "j12")
timings["J12 (dF/dy)"] = _toc(t0)
t0 = _tic()
self._j21_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._state_vars, "j21")
timings["J21 (dG/dx)"] = _toc(t0)
t0 = _tic()
self._j22_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._algebraic_vars, "j22")
timings["J22 (dG/dy)"] = _toc(t0)
else:
t0 = _tic()
self._j22_fn = rms_compiler.compile_sparse_jacobian(self._algebraic_eqs, self._algebraic_vars, "j22")
timings["J22 only (no states)"] = _toc(t0)
if options.verbose > 0:
print(f"Model compiled with {self._n_vars} variables")
print("\nCompilation timing summary:")
for k, v in timings.items():
print(f" {k:30s}: {v:8.4f} s")
print(f"\nTotal JIT compile time: {sum(timings.values()):.4f} s")
variable_parameters_init = np.ones(self.get_variable_parameter_number())
# TODO: think about this thing of calling twice here
self._variable_parameters_values = self._event_params_fn(variable_parameters_init, 0.0)
self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, 0.0)
self._constant_params = np.array([const.value for const in self._parameters_values])
if options.verbose > 0:
print(f"\nTotal compile time: {sum(timings.values()):.4f} s")
[docs]
def add_variables_to_compilation_dicts(self, elm: ALL_DEV_TYPES, mdl: Block):
"""
add variables and parameters info to the system block
:param elm:
:type elm: Union[VeraGridEngine.Devices.Substation.bus.Bus, VeraGridEngine.Devices.Injections.load.Load]
:param mdl:
:type mdl: VeraGridEngine.Utils.Symbolic.block.Block
:return:
:rtype: None
"""
block_item: Block
for block_item in mdl.get_all_blocks():
# i is for variables
v: Var
for v in block_item.state_vars:
self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{self._n_vars}]"
self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{self._n_vars}"
self._uid2idx_vars[v.uid] = self._n_vars
self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=block_item)
self.add_device_var(dev=elm, var=v)
self.sys_vars[v.uid] = v
self._state_vars.append(v)
self._n_vars += 1
for v in block_item.algebraic_vars:
self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{self._n_vars}]"
self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{self._n_vars}"
self._uid2idx_vars[v.uid] = self._n_vars
self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=block_item)
self.add_device_var(dev=elm, var=v)
self.sys_vars[v.uid] = v
self._algebraic_vars.append(v)
self._n_vars += 1
# j is for parameters
ep: Var
for ep, const in block_item.parameters.items():
self._compiler_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}[{self._n_params}]"
self._alias_names_dict[ep.uid] = f"{self.CONSTANT_PARAMS_NAME}_{self._n_params}"
self._uid2idx_params[ep.uid] = self._n_params
self._constant_parameters.append(ep)
self._parameters_values.append(const)
self._n_params += 1
# m is for variable parameters
for ep, eq in block_item.event_dict.items():
self._compiler_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}[{self._n_event_params}]"
self._alias_names_dict[ep.uid] = f"{self.VARIABLE_PARAMS_NAME}_{self._n_event_params}"
self._uid2idx_event_params[ep.uid] = self._n_event_params
self._variable_parameters.append(ep)
self._event_parameters_eqs.append(eq)
self._n_event_params += 1
# l is for differential vars
for v in block_item.diff_vars:
self._compiler_names_dict[v.uid] = f"{self.DIFF_NAME}[{self._n_diff}]"
self._alias_names_dict[v.uid] = f"{self.DIFF_NAME}_{self._n_diff}"
self._uid2idx_diff[v.uid] = self._n_diff
self._register_global_var_name(name_key=v.name + elm.name, uid=v.uid, block=block_item)
self.add_device_var(dev=elm, var=v)
self._diff_vars.append(v)
self._n_diff += 1
self._state_eqs.extend(block_item.state_eqs)
self._algebraic_eqs.extend(block_item.algebraic_eqs)
if self.progress_signal is not None:
self.progress_signal.emit(20)
[docs]
def set_init_guess(self, mdl: Block, reference_powerflow: VarPowerFlowReferenceType, val: float):
"""
add values from powerflow to initial guess
:param mdl:
:type mdl:
:param reference_powerflow:
:type reference_powerflow:
:param val:
:type val:
:return:
:rtype:
"""
var = mdl.external_mapping[reference_powerflow]
self.init_guess[var.uid] = val
[docs]
def get_init_guess_info(self) -> pd.DataFrame:
"""
returns a df with uid, name, and initial value for the system variables
:return:
:rtype:
"""
vars_names = list()
for key, value in self.init_guess.items():
var_name = self.sys_vars[key].name
vars_names.append((key, var_name, value))
return pd.DataFrame(data=vars_names, columns=["key", "var_name", "value"])
[docs]
def get_device_vars_dict(self) -> Dict[ALL_DEV_TYPES, List[Var]]:
"""
:return:
:rtype:
"""
return self._vars_info
[docs]
def add_device_var(self, dev: ALL_DEV_TYPES, var: Var):
"""
Associate a variable with a device
:param dev: Device
:param var: Variable
"""
var_list = self._vars_info.get(dev, None)
if var_list is None:
self._vars_info[dev] = [var]
else:
var_list.append(var)
[docs]
def get_var_idx(self, v: Var) -> int:
"""
:param v:
:return:
"""
return self._uid2idx_vars[v.uid]
@property
def vars_glob_name2uid(self):
"""
:return:
"""
return self._vars_glob_name2uid
def _register_global_var_name(self, name_key: str, uid: int, block: Block | None = None) -> None:
prev_uid = self._vars_glob_name2uid.get(name_key)
if prev_uid is None or prev_uid == uid:
self._vars_glob_name2uid[name_key] = uid
return
block_tag = ""
if block is not None:
block_tag = f"::{block.name}#{block.uid}"
disambiguated_key = f"{name_key}{block_tag}"
if disambiguated_key == name_key:
disambiguated_key = f"{name_key} [{uid}]"
if disambiguated_key in self._vars_glob_name2uid and self._vars_glob_name2uid[disambiguated_key] != uid:
raise ValueError(
f"Global variable name collision for '{name_key}' and fallback '{disambiguated_key}': "
f"existing uid={self._vars_glob_name2uid[disambiguated_key]}, new uid={uid}."
)
self._vars_glob_name2uid[disambiguated_key] = uid
@property
def uid2idx_vars(self):
"""
:return:
"""
return self._uid2idx_vars
@property
def algebraic_vars(self):
"""
:return:
"""
return self._algebraic_vars
@property
def state_and_algebraic_vars(self) -> List[Var]:
"""
:return:
"""
variables = list()
for lst in [self._state_vars, self._algebraic_vars]:
for var in lst:
variables.append(var)
return variables
@property
def state_vars(self):
"""
:return:
"""
return self._state_vars
[docs]
def get_all_vars_number(self) -> int:
return self._n_vars
[docs]
def get_diff_var_number(self) -> int:
"""
Get the number of diff vars
:return:
"""
return len(self._diff_vars)
[docs]
def get_algebraic_var_number(self) -> int:
return len(self._algebraic_vars)
[docs]
def get_states_number(self) -> int:
return self._n_state
[docs]
def get_variable_parameter_number(self) -> int:
return len(self._variable_parameters)
[docs]
def get_x0(self) -> Vec:
"""
Helper function to build the initial vector
:return: array matching with the mapping, matching the solver ordering
"""
x = np.zeros(len(self._state_vars) + len(self._algebraic_vars))
for uid, val in self.init_guess.items():
i = self._uid2idx_vars[uid]
x[i] = val
return x
[docs]
def update_variable_params(self, t: float, x_snapshot: Vec | None = None):
"""
Update the variable parameters
:param t:
:return:
"""
self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, t)
[docs]
def update_variable_params_ts(self, x:Vec, t: float):
"""
Update the variable parameters
:param t:
:return:
"""
self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, t)
for i, var in enumerate(self._reformulated_vars):
lagged_var = self._reformulated_lagvars[i]
lag_idx = self._uid2idx_event_params[lagged_var.uid]
var_idx = self._uid2idx_vars[var.uid]
self._variable_parameters_values[lag_idx] = x[var_idx]
[docs]
def get_dx(self, x: Vec, xn: Vec, dx: Vec, h: float) -> Vec:
return self._derivative_fn(x, xn, dx, h)
[docs]
def rhs_state(self, x: Vec, dx: Vec) -> Vec:
return self._rhs_state_fn(x, dx,
self._variable_parameters_values,
self._constant_params)
[docs]
def rhs_algebraic(self, x: Vec, dx: Vec) -> Vec:
return self._rhs_algeb_fn(x, dx,
self._variable_parameters_values,
self._constant_params)
[docs]
def get_j11(self, x: Vec, dx: Vec, h: float):
return self._j11_fn(x, dx,
self._variable_parameters_values,
self._constant_params,
h)
[docs]
def get_j12(self, x: Vec, dx: Vec, h: float):
return self._j12_fn(x, dx,
self._variable_parameters_values,
self._constant_params,
h)
[docs]
def get_j21(self, x: Vec, dx: Vec, h: float):
return self._j21_fn(x, dx,
self._variable_parameters_values,
self._constant_params,
h)
[docs]
def get_j22(self, x: Vec, dx: Vec, h: float):
return self._j22_fn(x, dx,
self._variable_parameters_values,
self._constant_params,
h)
[docs]
def get_dt(self):
return self._dt
[docs]
def get_dt_value(self):
dt_value = self._variable_parameters_values[-2]
return dt_value
[docs]
def set_events_group(self, rms_events_group: RmsEventsGroup):
"""
Set the events group to use for this simulation.
This filters events by group and recompiles the event parameters function.
:param rms_events_group: The RmsEventsGroup to use
"""
# Restore original event parameters equations
self._event_parameters_eqs = self._event_parameters_eqs0.copy()
# Collect events for this group only
collect_events = {
param: {"times": [], "values": []}
for param in self._variable_parameters
}
# Filter events by group
rms_evts = [evt for evt in self.grid.rms_events if evt.group.idtag == rms_events_group.idtag]
for rms_evt in rms_evts:
collect_events[rms_evt.parameter]["times"].append(rms_evt.time)
collect_events[rms_evt.parameter]["values"].append(rms_evt.value)
# Apply piecewise functions for events
for param, events_info in collect_events.items():
if events_info["times"]:
default_value = self._event_parameters_eqs[self._variable_parameters.index(param)]
self._event_parameters_eqs[self._variable_parameters.index(param)] = piecewise(
time_var=self._glob_time,
t_events=np.array(events_info["times"]),
new_values=np.array(events_info["values"]),
default_value=default_value
)
# Recompile the event parameters function
self._event_params_fn = SymbolicParamsVector(
eqs=self._event_parameters_eqs,
compiler_names_dict=self._compiler_names_dict,
alias_names_dict=self._alias_names_dict,
EVENT_PARAMS_NAME=self.VARIABLE_PARAMS_NAME,
TIME_NAME=self.TIME_NAME,
)
# Reinitialize variable parameters values
variable_parameters_init = np.ones(self.get_variable_parameter_number())
self._variable_parameters_values = self._event_params_fn(variable_parameters_init, 0.0)
self._variable_parameters_values = self._event_params_fn(self._variable_parameters_values, 0.0)