# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from abc import ABC
import numpy as np
from typing import Any, Dict, List, Optional, Protocol
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.Utils.Symbolic.symbolic import Const, Expr, Var, hard_sat, heaviside, piecewise
from VeraGridEngine.basic_structures import Vec
from VeraGridEngine.Simulations.driver_template import DummySignal
from VeraGridEngine.Devices.Events.emt_events_group import EmtEventsGroup
from VeraGridEngine.enumerations import DynamicEventTransitionType, VarPowerFlowReferenceType
def _get_diff_var_sort_key(diff_var: Var) -> int:
"""
Return the ordering key used to sort differential variables.
:param diff_var: Differential variable to inspect.
:return: Sorting key associated with the differential variable.
"""
return diff_var.diff_order
def _get_external_mapping(mdl: Block) -> Optional[Dict[Any, Var]]:
"""
Return the external mapping associated with a model block.
:param mdl: Model block to inspect.
:return: External mapping dictionary or None.
"""
external_mapping: Optional[Dict[Any, Var]] = mdl.external_mapping
if external_mapping is None:
return None
else:
return external_mapping
[docs]
class EmtBoundaryUpdateProtocol(Protocol):
"""
Structural protocol implemented by EMT boundary update providers.
"""
[docs]
def update(self, t: float, x: Vec, params: Vec) -> None:
"""
Update the full parameter vector in place.
"""
...
[docs]
def get_next_forced_event_time(self, t_prev: float, t_target: float) -> float | None:
"""
Return the next exact-alignment event time inside ``(t_prev, t_target]``.
"""
...
def _implements_forced_event_time_api(boundary_updater: Any) -> bool:
"""
Return whether one boundary updater type implements ``get_next_forced_event_time``.
:param boundary_updater: Boundary updater instance.
:return: ``True`` when the API is implemented.
"""
updater_type: type = type(boundary_updater)
base_type: type
for base_type in updater_type.__mro__:
if "get_next_forced_event_time" in base_type.__dict__:
return True
else:
pass
return False
def _emt_event_spec_time_sort_key(event_spec: Dict[str, float | str | None]) -> float:
"""
Return the sorting key of one EMT runtime-event specification.
:param event_spec: EMT runtime-event specification.
:return: Event start time.
"""
return float(event_spec["time"])
[docs]
def resolve_solver_boundary_updater(
problem: "EmtProblemTemplate",
boundary_updater: EmtBoundaryUpdateProtocol | None,
t0: float,
) -> EmtBoundaryUpdateProtocol | None:
"""
Resolve the boundary updater consumed by an EMT solver.
The problem-owned updater remains the default, but demos and legacy callers
may still provide external wrappers that only implement ``update()``.
"""
problem.reset_boundary_update_state(float(t0))
if boundary_updater is None:
return problem.boundary_update
return boundary_updater
[docs]
def get_solver_forced_event_time(
boundary_updater: EmtBoundaryUpdateProtocol | None,
t_prev: float,
t_target: float,
) -> float | None:
"""
Query the next forced-alignment event time if the updater exposes it.
Legacy boundary updaters that only implement ``update()`` are treated as
having no forced event alignment requirements.
"""
if boundary_updater is None:
return None
else:
pass
if _implements_forced_event_time_api(boundary_updater):
pass
else:
return None
next_time: float | None = boundary_updater.get_next_forced_event_time(float(t_prev), float(t_target))
if next_time is None:
return None
else:
return float(next_time)
[docs]
def is_problem_owned_boundary_updater(problem: "EmtProblemTemplate",
boundary_updater: EmtBoundaryUpdateProtocol | None) -> bool:
"""
Return whether the updater is owned by the EMT problem itself.
This distinguishes endogenous problem boundary logic from external wrappers
supplied by tests or demos.
"""
if boundary_updater is None:
return False
if boundary_updater is problem:
return True
problem_boundary_updater = problem.boundary_update
if problem_boundary_updater is None:
return False
return boundary_updater is problem_boundary_updater
def _freeze_runtime_expr_at_time(expr: Any, time_var: Var, sample_time: float) -> Any:
"""
Freeze one runtime expression at an explicit time value.
:param expr: Runtime expression to freeze.
:param time_var: Global symbolic time variable.
:param sample_time: Time used to evaluate the expression boundary.
:return: Time-frozen symbolic expression.
"""
if isinstance(expr, (Expr, Var, Const)):
return expr.subs(dict({time_var: Const(float(sample_time))})).simplify()
else:
return expr
def _build_ramp_runtime_expr(
time_var: Var,
start_time: float,
end_time: float,
before_expr: Any,
final_value: float,
) -> Any:
"""
Build one linear ramp transition on top of an existing runtime expression.
:param time_var: Global symbolic time variable.
:param start_time: Ramp start time.
:param end_time: Ramp end time.
:param before_expr: Expression active before the ramp starts.
:param final_value: Final runtime value after the ramp ends.
:return: Combined symbolic expression.
"""
start_expr: Any = _freeze_runtime_expr_at_time(before_expr, time_var, start_time)
duration_expr: Const = Const(float(end_time - start_time))
time_offset_expr: Any = time_var - Const(float(start_time))
progress_expr: Any = hard_sat(time_offset_expr / duration_expr, Const(0.0), Const(1.0))
ramp_expr: Any = start_expr + progress_expr * (Const(float(final_value)) - start_expr)
started_expr: Any = heaviside(time_var - Const(float(start_time)))
ended_expr: Any = heaviside(time_var - Const(float(end_time)))
return (Const(1.0) - started_expr) * before_expr + started_expr * (
(Const(1.0) - ended_expr) * ramp_expr + ended_expr * Const(float(final_value))
)
[docs]
class EmtProblemTemplate(ABC):
"""
Intermediate layer that manages DAE plumbing including indexing, variable mapping,
and event updating, regardless of whether the system comes from an electrical
circuit or a generic mathematical model.
"""
VARS_NAME = "vars"
VARIABLE_PARAMS_NAME = "vprms"
CONSTANT_PARAMS_NAME = "cprms"
DIFF_NAME = "diff"
TIME_NAME = "glob_time"
def __init__(self,
sys_block: Block,
static_parameter_values_mapping: Dict[Var, Const],
glob_time: Var,
progress_signal: DummySignal | None = None,
progress_text: DummySignal | None = None,
)->None:
"""
Initialize the EMT problem template.
:param sys_block: Root symbolic block that defines the EMT problem.
:param glob_time: Global time variable used by runtime expressions.
:return: None
"""
super().__init__()
self.sys_block: Block = sys_block
self._glob_time: Var = glob_time
self._newton_trace_collector: Optional[Any] = None
self._state_vars: List[Var] = self.sys_block.state_vars
self._algebraic_vars: List[Var] = self.sys_block.algebraic_vars
self._state_eqs: List[Any] = self.sys_block.state_eqs
self._algebraic_eqs: List[Any] = self.sys_block.algebraic_eqs
self._diff_vars: List[Var] = self.sys_block.diff_vars
self._static_parameter_values: Dict[Var, Const] = static_parameter_values_mapping
self._constant_parameters: List[Var] = list(self._static_parameter_values.keys())
self._parameters_values: List[Const] = list(self._static_parameter_values.values())
self._variable_parameters: List[Var] = list()
self._event_parameters_eqs: List[Any] = list()
self._variable_parameters: List[Var] = list()
self._event_parameters_eqs: List[Any] = list()
self._runtime_mode_uids: set[int] = set()
if self.sys_block.event_dict:
self._variable_parameters.extend(list(self.sys_block.event_dict.keys()))
self._event_parameters_eqs.extend(list(self.sys_block.event_dict.values()))
self._variable_parameters.extend(list(self.sys_block.mode_dict.keys()))
self._event_parameters_eqs.extend(list(self.sys_block.mode_dict.values()))
for v in self.sys_block.mode_dict.keys():
self._runtime_mode_uids.add(v.uid)
self._runtime_all_parameters_source: List[Var] = list(self._variable_parameters)
self._runtime_all_eqs_source: List[Any] = list(self._event_parameters_eqs)
self._runtime_continuous_parameters: List[Var] = list()
self._runtime_mode_parameters: List[Var] = list()
self._runtime_continuous_eqs: List[Any] = list()
self._runtime_mode_eqs: List[Any] = list()
self._runtime_continuous_slice: slice = slice(0, 0)
self._runtime_mode_slice: slice = slice(0, 0)
self._rebuild_runtime_parameter_partition()
self.init_guess: Dict[int, float] = dict()
self.event_params_init_dict: Dict[int, float | int | complex | None] = dict()
self.diff_init_guess: Dict[int, float] = dict()
# self._vars_info: Dict[Any, List[Var]] = dict()
self._finalize_order_and_maps()
self._event_params_values: Vec = np.zeros(0, dtype=np.float64)
self._constant_params_values: Vec = np.zeros(0, dtype=np.float64)
self._build_runtime_param_vectors()
self.progress_signal = DummySignal() if progress_signal is None else progress_signal
self.progress_text = DummySignal(str) if progress_text is None else progress_text
@property
def glob_time(self) -> Var:
"""
Return the global time symbolic variable.
:return: Global time symbolic variable.
"""
return self._glob_time
@property
def boundary_update(self) -> EmtBoundaryUpdateProtocol | None:
"""
Return the boundary update provider consumed by EMT solvers.
Problems without endogenous boundary logic return ``None``.
"""
return None
def _finalize_order_and_maps(self)->None:
"""
Build canonical ordering, index maps and internal counters.
:return: None
"""
self._diff_vars = sorted(self._diff_vars, key=_get_diff_var_sort_key)
self._n_state = len(self._state_vars)
self._n_alg = len(self._algebraic_vars)
self._n_vars = self._n_state + self._n_alg
self._n_event_params = len(self._variable_parameters)
self._n_params = len(self._constant_parameters)
self._n_diff = len(self._diff_vars)
self._n_algebraic = len(self._algebraic_eqs)
self._compiler_names_dict: Dict[int, str] = dict()
self._alias_names_dict: Dict[int, str] = dict()
self._uid2idx_vars: Dict[int, int] = dict()
self._uid2idx_event_params: Dict[int, int] = dict()
self._uid2idx_params: Dict[int, int] = dict()
self._uid2idx_diff: Dict[int, int] = dict()
self._uid2idx_t: Dict[int, int] = dict()
self._vars_glob_name2uid: Dict[str, int] = dict()
i: int = 0
for v in self._state_vars:
self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]"
self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}"
self._uid2idx_vars[v.uid] = i
self._vars_glob_name2uid[v.name] = v.uid
i += 1
for v in self._algebraic_vars:
self._compiler_names_dict[v.uid] = f"{self.VARS_NAME}[{i}]"
self._alias_names_dict[v.uid] = f"{self.VARS_NAME}_{i}"
self._uid2idx_vars[v.uid] = i
self._vars_glob_name2uid[v.name] = v.uid
i += 1
for j, p in enumerate(self._constant_parameters):
self._compiler_names_dict[p.uid] = f"{self.CONSTANT_PARAMS_NAME}[{j}]"
self._alias_names_dict[p.uid] = f"{self.CONSTANT_PARAMS_NAME}_{j}"
self._uid2idx_params[p.uid] = j
for k, p in enumerate(self._variable_parameters):
self._compiler_names_dict[p.uid] = f"{self.VARIABLE_PARAMS_NAME}[{k}]"
self._alias_names_dict[p.uid] = f"{self.VARIABLE_PARAMS_NAME}_{k}"
self._uid2idx_event_params[p.uid] = k
for k, d in enumerate(self._diff_vars):
self._compiler_names_dict[d.uid] = f"{self.DIFF_NAME}[{k}]"
self._alias_names_dict[d.uid] = f"{self.DIFF_NAME}_{k}"
self._uid2idx_diff[d.uid] = k
self._vars_glob_name2uid[d.name] = d.uid
self._compiler_names_dict[self._glob_time.uid] = self.TIME_NAME
self._uid2idx_t[self._glob_time.uid] = 0
def _build_runtime_param_vectors(self) -> None:
"""
Build and initialize runtime and constant parameter buffers.
Runtime parameters are kept in a single flat vector. Continuous and mode
families are only distinguished by slices and by the update rules applied
later during the simulation.
:return: None
"""
n_runtime: int = self.get_variable_parameter_number()
self._event_params_values = np.zeros(n_runtime, dtype=np.float64)
if n_runtime > 0:
self._event_params_values = self._initialize_runtime_parameter_values(0.0)
self._event_params_values = self.def_event_params_fn(self._event_params_values, 0.0)
else:
pass
self._constant_params_values = np.array(
[parameter.value for parameter in self._parameters_values],
dtype=np.float64
)
[docs]
def rebuild_runtime_param_vectors(self) -> None:
"""
Rebuild the runtime and constant parameter buffers.
:return: None
"""
self._build_runtime_param_vectors()
[docs]
def set_events_group(self, emt_events_group: EmtEventsGroup | None) -> None:
"""Apply a selected EMT events group to the runtime parameter equations.
The method is generic with respect to the specific EMT templates used in the
problem. Any runtime parameter exposed through `event_dict` / `mode_dict`
can be reassigned with a piecewise time function driven by the selected
group of `grid.emt_events`.
"""
active_runtime_eqs = list(self._runtime_all_eqs_source)
collect_events = {param.uid: list() for param in self._variable_parameters}
uid_to_parameter = {param.uid: param for param in self._variable_parameters}
emt_events = self.grid.emt_events
if emt_events_group is None:
selected_events = list()
else:
selected_events = [
evt for evt in emt_events
if evt.group is not None and evt.group.idtag == emt_events_group.idtag
]
for emt_evt in selected_events:
parameter = emt_evt.parameter
parameter_uid = parameter.uid if isinstance(parameter, Var) else None
if parameter_uid in collect_events:
transition_type: DynamicEventTransitionType = emt_evt.transition_type
end_time = emt_evt.end_time
collect_events[parameter_uid].append(
dict({
"time": float(emt_evt.time),
"value": float(emt_evt.value),
"end_time": None if end_time is None else float(end_time),
"transition_type": transition_type,
})
)
for parameter_uid, event_specs in collect_events.items():
if len(event_specs) == 0:
pass
else:
parameter = uid_to_parameter[parameter_uid]
param_index = self._variable_parameters.index(parameter)
active_expr = active_runtime_eqs[param_index]
sorted_specs = sorted(event_specs, key=_emt_event_spec_time_sort_key)
for event_spec in sorted_specs:
if event_spec["transition_type"] == DynamicEventTransitionType.Ramp:
start_time = float(event_spec["time"])
end_time_raw = event_spec["end_time"]
if end_time_raw is None:
raise ValueError("Ramp EMT events require an end_time")
else:
pass
end_time = float(end_time_raw)
if end_time > start_time:
pass
else:
raise ValueError("Ramp EMT events require end_time greater than time")
active_expr = _build_ramp_runtime_expr(
time_var=self._glob_time,
start_time=start_time,
end_time=end_time,
before_expr=active_expr,
final_value=float(event_spec["value"]),
)
else:
active_expr = piecewise(
time_var=self._glob_time,
t_events=np.asarray([float(event_spec["time"])], dtype=np.float64),
new_values=np.asarray([float(event_spec["value"] )], dtype=np.float64),
default_value=active_expr,
)
active_runtime_eqs[param_index] = active_expr
self._runtime_all_eqs_source = active_runtime_eqs
self._rebuild_runtime_parameter_partition()
self._build_runtime_param_vectors()
[docs]
def reset_boundary_update_state(self, t0: float = 0.0) -> None:
"""
Reset runtime parameter values before a new EMT simulation starts.
:param t0: Initial simulation time.
:return: None
"""
self._event_params_values = self._initialize_runtime_parameter_values(float(t0))
self._event_params_values = self.def_event_params_fn(self._event_params_values, float(t0))
[docs]
def get_compiler_names_dict(self) -> Dict[int, str]:
"""
Return the compiler-name mapping used by symbolic kernels.
:return: Compiler-name dictionary.
"""
return dict(self._compiler_names_dict)
[docs]
def get_alias_names_dict(self) -> Dict[int, str]:
"""
Return the alias-name mapping used by symbolic kernels.
:return: Alias-name dictionary.
"""
return dict(self._alias_names_dict)
[docs]
def get_event_parameter_equations(self) -> List[Any]:
"""
Return the runtime event-parameter equations.
:return: Event-parameter equations.
"""
return list(self._event_parameters_eqs)
def _rebuild_runtime_parameter_partition(self) -> None:
"""
Rebuild the runtime parameter partition.
The runtime parameters are stored in a single flat vector, but their
order is redefined so that continuous runtime inputs appear first and
retained discrete mode parameters appear afterwards.
:return: None
"""
self._runtime_continuous_parameters = list()
self._runtime_mode_parameters = list()
self._runtime_continuous_eqs = list()
self._runtime_mode_eqs = list()
n_source: int = len(self._runtime_all_parameters_source)
i: int = 0
while i < n_source:
parameter: Var = self._runtime_all_parameters_source[i]
equation: Any = self._runtime_all_eqs_source[i]
if parameter.uid in self._runtime_mode_uids:
self._runtime_mode_parameters.append(parameter)
self._runtime_mode_eqs.append(equation)
else:
self._runtime_continuous_parameters.append(parameter)
self._runtime_continuous_eqs.append(equation)
i += 1
self._variable_parameters = list()
self._event_parameters_eqs = list()
for parameter in self._runtime_continuous_parameters:
self._variable_parameters.append(parameter)
for parameter in self._runtime_mode_parameters:
self._variable_parameters.append(parameter)
for equation in self._runtime_continuous_eqs:
self._event_parameters_eqs.append(equation)
for equation in self._runtime_mode_eqs:
self._event_parameters_eqs.append(equation)
n_continuous: int = len(self._runtime_continuous_parameters)
n_mode: int = len(self._runtime_mode_parameters)
self._runtime_continuous_slice = slice(0, n_continuous)
self._runtime_mode_slice = slice(n_continuous, n_continuous + n_mode)
[docs]
def set_runtime_mode_parameters(self, mode_parameters: List[Var]) -> None:
"""
Classify a subset of runtime parameters as retained discrete mode parameters.
The physical storage remains a single flat runtime vector. This method
only changes the semantic partition and rebuilds the associated index maps
and runtime initialization buffers.
:param mode_parameters: Runtime parameters to classify as mode parameters.
:return: None
"""
valid_runtime_uids: set[int] = set()
selected_mode_uids: set[int] = set()
for parameter in self._runtime_all_parameters_source:
valid_runtime_uids.add(parameter.uid)
for parameter in mode_parameters:
if parameter.uid in valid_runtime_uids:
selected_mode_uids.add(parameter.uid)
else:
raise KeyError(
f"Runtime mode parameter uid={parameter.uid} does not belong to the runtime parameter source."
)
self._runtime_mode_uids = selected_mode_uids
self._rebuild_runtime_parameter_partition()
self._finalize_order_and_maps()
self._build_runtime_param_vectors()
def _initialize_runtime_parameter_values(self, tm: float) -> Vec:
"""
Initialize the flat runtime parameter vector at a given time.
This method evaluates the full runtime parameter list once in the final
flat order. It is used for initialization only. Continuous parameters are
then refined afterwards by def_event_params_fn().
:param tm: Initialization time.
:return: Initialized runtime parameter vector.
"""
n_runtime: int = len(self._variable_parameters)
out: Vec = np.zeros(n_runtime, dtype=np.float64)
i: int = 0
while i < n_runtime:
expression: Any = self._event_parameters_eqs[i]
out[i] = self._evaluate_runtime_expression(expression, out, tm)
i += 1
return out
def _evaluate_runtime_expression(self, expression: Any, runtime_params: Vec, tm: float) -> float:
"""
Evaluate a runtime parameter expression.
The evaluation uses UID-based bindings to avoid ambiguous name-based
dispatch and to keep the runtime parameter update logic deterministic.
:param expression: Symbolic or numeric expression.
:param runtime_params: Current flat runtime parameter vector.
:param tm: Current time.
:return: Numeric value of the expression.
"""
if isinstance(expression, Const):
if expression.value is None:
return 0.0
else:
return float(expression.value)
elif isinstance(expression, Var):
if expression.uid == self._glob_time.uid or expression.name in {"time", self.TIME_NAME}:
return float(tm)
else:
idx_runtime: int | None = self._uid2idx_event_params.get(expression.uid, None)
if idx_runtime is not None:
return float(runtime_params[idx_runtime])
else:
idx_const: int | None = self._uid2idx_params.get(expression.uid, None)
if idx_const is not None:
return float(self._parameters_values[idx_const].value)
else:
idx_var: int | None = self._uid2idx_vars.get(expression.uid, None)
if idx_var is not None:
init_value: float | int | complex | None = self.init_guess.get(expression.uid, None)
if init_value is None:
return 0.0
else:
return float(init_value)
else:
idx_diff: int | None = self._uid2idx_diff.get(expression.uid, None)
if idx_diff is not None:
diff_init_value: float | int | complex | None = self.diff_init_guess.get(expression.uid, None)
if diff_init_value is None:
return 0.0
else:
return float(diff_init_value)
else:
return 0.0
elif isinstance(expression, Expr):
uid_bindings: Dict[int, float] = dict()
uid: int
idx: int
init_value: float | int | complex | None
diff_init_value: float | int | complex | None
var: Var
for uid, idx in self._uid2idx_event_params.items():
uid_bindings[uid] = float(runtime_params[idx])
for uid, idx in self._uid2idx_params.items():
uid_bindings[uid] = float(self._parameters_values[idx].value)
# Runtime mode parameters such as delayed outputs may depend on the
# already initialized algebraic, state, and differential values of
# the EMT problem. The broader explicit-initialization algorithm has
# already assembled those guesses before this runtime initialization
# stage runs, so we expose them here as UID bindings. This lets one
# retained mode variable start from the same operating point as the
# algebraic signal it delays.
for uid, idx in self._uid2idx_vars.items():
init_value = self.init_guess.get(uid, None)
if init_value is None:
pass
else:
uid_bindings[uid] = float(init_value)
for uid, idx in self._uid2idx_diff.items():
diff_init_value = self.diff_init_guess.get(uid, None)
if diff_init_value is None:
pass
else:
uid_bindings[uid] = float(diff_init_value)
uid_bindings[self._glob_time.uid] = float(tm)
for var in expression.get_vars():
if var.name in {"time", self.TIME_NAME}:
uid_bindings[var.uid] = float(tm)
return float(expression.eval_uid(uid_bindings))
else:
return float(expression)
[docs]
def get_state_vars(self)->List[Var]:
"""
Return the ordered list of state variables.
:return: Ordered list of state variables.
"""
return self._state_vars
[docs]
def get_algebraic_vars(self)->List[Var]:
"""
Return the ordered list of algebraic variables.
:return: Ordered list of algebraic variables.
"""
return self._algebraic_vars
[docs]
def state_and_algebraic_vars(self) -> List[Var]:
"""
:return:
"""
variables = list()
for lst in [self._state_vars, self._algebraic_vars]:
for var in lst:
variables.append(var)
return variables
[docs]
def get_state_eqs(self)->List[Any]:
"""
Return the ordered list of state equations.
:return: Ordered list of state equations.
"""
return self._state_eqs
[docs]
def get_algebraic_eqs(self)->List[Any]:
"""
Return the ordered list of algebraic equations.
:return: Ordered list of algebraic equations.
"""
return self._algebraic_eqs
[docs]
def get_variable_parameters(self)-> List[Var]:
"""
Return the ordered list of runtime parameters.
:return: Ordered list of runtime parameters.
"""
return self._variable_parameters
[docs]
def get_constant_parameters(self)-> List[Var]:
"""
Return the ordered list of constant parameters.
:return: Ordered list of constant parameters.
"""
return self._constant_parameters
[docs]
def get_diff_vars(self)-> List[Var]:
"""
Return the ordered list of differential variables.
:return: Ordered list of differential variables.
"""
return self._diff_vars
[docs]
def get_parameters_values(self)-> List[Const]:
"""
Return the ordered list of constant parameter values.
:return: Ordered list of constant parameter values.
"""
return self._parameters_values
[docs]
def get_all_vars_number(self) -> int:
"""
Return the total number of state and algebraic variables.
:return: Total number of variables.
"""
return self._n_vars
[docs]
def get_diff_var_number(self) -> int:
"""
Return the number of differential variables.
:return: Number of differential variables.
"""
return self._n_diff
[docs]
def get_algebraic_var_number(self) -> int:
"""
Return the number of algebraic variables.
:return: Number of algebraic variables.
"""
return self._n_alg
[docs]
def get_states_number(self) -> int:
"""
Return the number of state variables.
:return: Number of state variables.
"""
return self._n_state
[docs]
def get_variable_parameter_number(self) -> int:
"""
Return the number of runtime parameters.
:return: Number of runtime parameters.
"""
return self._n_event_params
[docs]
def get_x0(self) -> Vec:
"""
Build the initial state vector from the stored initialization guess.
:return: Initial state vector.
"""
x = np.zeros(self._n_vars, dtype=np.float64)
for uid, val in self.init_guess.items():
idx = self._uid2idx_vars.get(uid, None)
if idx is not None:
x[idx] = float(val)
else:
pass
return x
[docs]
def get_dx0(self) -> Vec:
"""
Build the initial differential vector from the stored differential initialization guess.
:return: Initial differential vector.
"""
dx = np.zeros(self._n_diff, dtype=np.float64)
for uid, val in self.diff_init_guess.items():
idx = self._uid2idx_diff.get(uid, None)
if idx is not None:
dx[idx] = float(val)
else:
pass
return dx
[docs]
def def_event_params_fn(self, ev_param: Vec, tm: float) -> Vec:
"""
Update only the continuous runtime parameter slice.
Retained discrete mode parameters are preserved exactly as they enter
this function. They are expected to be modified explicitly by the
boundary update layer or by future event logic.
:param ev_param: Current flat runtime parameter vector.
:param tm: Current simulation time.
:return: Updated flat runtime parameter vector.
"""
n_continuous: int = len(self._runtime_continuous_eqs)
if n_continuous == 0:
return ev_param
else:
out: Vec = ev_param.copy()
i: int = 0
while i < n_continuous:
global_idx: int = self._runtime_continuous_slice.start + i
expression: Any = self._runtime_continuous_eqs[i]
out[global_idx] = self._evaluate_runtime_expression(expression, out, tm)
i += 1
return out
[docs]
def update_variable_params(self, t: float) -> None:
"""
Update the internal runtime parameter values at the given time.
:param t: Current time.
:return: None
"""
self._event_params_values = self.def_event_params_fn(self._event_params_values, float(t))
[docs]
def get_full_param_index(self, uid: int) -> int:
"""
Return the flat full-parameter index associated with the given UID.
:param uid: Parameter unique identifier.
:return: Flat index inside the full parameter vector.
"""
n_ev = len(self._variable_parameters)
if uid in self._uid2idx_event_params:
return self._uid2idx_event_params[uid]
if uid in self._uid2idx_params:
return n_ev + self._uid2idx_params[uid]
raise KeyError(f"Unknown param uid={uid}")
[docs]
def get_newton_trace_collector(self) -> Optional[Any]:
"""
Return the Newton trace collector instance.
:return: Newton trace collector instance or None.
"""
return self._newton_trace_collector
[docs]
def set_newton_trace_collector(self, collector: Any)->None:
"""
Set the Newton trace collector instance.
:param collector: Newton trace collector instance.
:return: None
"""
self._newton_trace_collector = collector
# def get_device_vars_dict(self) -> Dict[Any, List[Var]]:
# """
# Return the device-to-variable mapping dictionary.
#
# :return: Device-to-variable mapping dictionary.
# """
# return self._vars_info
[docs]
def get_var_idx(self, v: Var) -> int:
"""
Return the flat variable index associated with the given variable.
:param v: Variable to locate.
:return: Flat variable index.
"""
return self._uid2idx_vars[v.uid]
[docs]
def get_diff_var_idx(self, dv: Var) -> int:
"""
Return the flat differential index associated with the given differential variable.
:param dv: Differential variable to locate.
:return: Flat differential index.
"""
return self._uid2idx_diff[dv.uid]
@property
def vars_glob_name2uid(self) -> Dict[str, int]:
"""
:return:
"""
return self._vars_glob_name2uid
# def set_init_guess(self, mdl: Block, reference_powerflow: VarPowerFlowReferenceType, val: float) -> None:
# """
# Set the initialization guess associated with a model external mapping.
#
# :param mdl: Model block containing the external mapping.
# :param reference_powerflow: Reference key used to locate the mapped variable.
# :param val: Initialization value.
# :return: None
# """
# external_mapping: Optional[Dict[Any, Var]] = _get_external_mapping(mdl)
#
# if external_mapping is None:
# pass
# else:
# var: Optional[Var] = external_mapping.get(reference_powerflow, None)
#
# if var is None:
# pass
# else:
# self.init_guess[var.uid] = float(val)
[docs]
def get_floquet_ak_stack(
self,
trajectory: np.ndarray,
h: float,
jac_evaluator: Optional[Any] = None,
static_params: Optional[Vec] = None
) -> Optional[np.ndarray]:
"""
Return the stack of transition matrices used for Floquet analysis.
:param trajectory: State trajectory over one period.
:param h: Time step.
:param jac_evaluator: Optional Jacobian evaluator.
:param static_params: Optional static parameter vector.
:return: Stack of transition matrices or None.
"""
return None
[docs]
def get_runtime_continuous_slice(self) -> slice:
"""
Return the slice of continuous runtime inputs inside the flat runtime vector.
:return: Continuous runtime slice.
"""
return self._runtime_continuous_slice
[docs]
def get_runtime_mode_slice(self) -> slice:
"""
Return the slice of retained mode parameters inside the flat runtime vector.
:return: Mode runtime slice.
"""
return self._runtime_mode_slice
[docs]
def get_runtime_continuous_parameters(self) -> List[Var]:
"""
Return the ordered list of continuous runtime parameters.
:return: Continuous runtime parameters.
"""
return self._runtime_continuous_parameters
[docs]
def get_runtime_mode_parameters(self) -> List[Var]:
"""
Return the ordered list of retained mode runtime parameters.
:return: Mode runtime parameters.
"""
return self._runtime_mode_parameters
@property
def uid2idx_vars(self)-> Dict[int, int]:
"""
Return the UID-to-variable-index mapping.
:return: UID-to-variable-index mapping.
"""
return self._uid2idx_vars
@property
def uid2idx_params(self)-> Dict[int, int]:
"""
Return the UID-to-constant-parameter-index mapping.
:return: UID-to-constant-parameter-index mapping.
"""
return self._uid2idx_params
@property
def uid2idx_event_params(self)-> Dict[int, int]:
"""
Return the UID-to-runtime-parameter-index mapping.
:return: UID-to-runtime-parameter-index mapping.
"""
return self._uid2idx_event_params
@property
def uid2idx_diff(self)-> Dict[int, int]:
"""
Return the UID-to-differential-index mapping.
:return: UID-to-differential-index mapping.
"""
return self._uid2idx_diff
@property
def event_params_values(self)-> Vec:
"""
Return the current flat runtime parameter vector.
:return: Current flat runtime parameter vector.
"""
return self._event_params_values
@property
def event_parameters_eqs(self) -> List[Any]:
return self._event_parameters_eqs