from __future__ import annotations
from typing import Callable
import numpy as np
import scipy.sparse as sp
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.Utils.Symbolic.compiled_functions import SymbolicDerivative, SymbolicJacobian, SymbolicVector
from VeraGridEngine.Utils.Symbolic.symbolic import Const, Expr, Var
def _unique_keep_order(vars_list: list[Var]) -> list[Var]:
seen: set[int] = set()
out: list[Var] = []
for var in vars_list:
if var.uid in seen:
continue
seen.add(var.uid)
out.append(var)
return out
[docs]
class SingleBlockDaeProblem:
"""
Generic single-block DAE problem adapter.
This class is shaped like the problem API consumed by implicit RMS solvers,
but focuses on one symbolic block and optional externally-driven inputs.
"""
VARS_NAME = "vrs"
VARIABLE_PARAMS_NAME = "vprms"
CONSTANT_PARAMS_NAME = "cprms"
DIFF_NAME = "diff"
def __init__(self, block: Block):
self.block = block
self._promote_differential_eqs_to_algebraic_all_blocks()
self.block.unify_blocks()
self._state_vars = list(self.block.state_vars)
self._algebraic_vars = list(self.block.algebraic_vars)
self._diff_vars = list(self.block.diff_vars)
self._state_eqs = list(self.block.state_eqs)
self._algebraic_eqs = list(self.block.algebraic_eqs)
self._all_vars = _unique_keep_order(self._state_vars + self._algebraic_vars)
self.uid2idx_vars = {v.uid: i for i, v in enumerate(self._all_vars)}
self._uid2idx_diff = {v.uid: i for i, v in enumerate(self._diff_vars)}
self._compiler_names_dict: dict[int, str] = {}
self._alias_names_dict: dict[int, str] = {}
for uid, i in self.uid2idx_vars.items():
self._compiler_names_dict[uid] = f"{self.VARS_NAME}[{i}]"
self._alias_names_dict[uid] = f"{self.VARS_NAME}_{i}"
for uid, i in self._uid2idx_diff.items():
self._compiler_names_dict[uid] = f"{self.DIFF_NAME}[{i}]"
self._alias_names_dict[uid] = f"{self.DIFF_NAME}_{i}"
self._input_var_by_name: dict[str, Var] = {}
self._input_param_by_name: dict[str, Var] = {}
self._input_profile_by_name: dict[str, Callable[[float], float] | float] = {}
self._variable_parameters: list[Var] = []
self._variable_parameters_eqs: list[Expr | Const] = []
self._variable_parameters_values = np.zeros(0, dtype=float)
self._constant_params = np.zeros(0, dtype=float)
self._collect_block_runtime_parameters()
self._wire_in_vars_as_runtime_inputs()
self._compile_rhs_and_jacobian()
self.init_guess: dict[int, float] = {}
self._populate_init_guess()
self._x0 = self._build_x0_from_init_data()
def _collect_block_runtime_parameters(self) -> None:
for var, eq in self.block.event_dict.items():
if not isinstance(var, Var):
continue
self._variable_parameters.append(var)
self._variable_parameters_eqs.append(eq)
def _promote_differential_eqs_to_algebraic_all_blocks(self) -> None:
for blk in self.block.get_all_blocks():
if len(blk.differential_eqs):
blk.algebraic_eqs.extend(blk.differential_eqs)
def _wire_in_vars_as_runtime_inputs(self) -> None:
for in_var in list(self.block.in_vars):
if in_var.uid not in self.uid2idx_vars:
self._algebraic_vars.append(in_var)
self._all_vars.append(in_var)
self.uid2idx_vars[in_var.uid] = len(self._all_vars) - 1
self._compiler_names_dict[in_var.uid] = f"{self.VARS_NAME}[{self.uid2idx_vars[in_var.uid]}]"
self._alias_names_dict[in_var.uid] = f"{self.VARS_NAME}_{self.uid2idx_vars[in_var.uid]}"
inp_name = in_var.name
runtime_param = Var(f"input_{inp_name}")
self._input_var_by_name[inp_name] = in_var
self._input_param_by_name[inp_name] = runtime_param
self._input_profile_by_name[inp_name] = 0.0
self._variable_parameters.append(runtime_param)
self._variable_parameters_eqs.append(Const(0.0))
self._algebraic_eqs.append(in_var - runtime_param)
self._algebraic_vars = _unique_keep_order(self._algebraic_vars)
self._all_vars = _unique_keep_order(self._state_vars + self._algebraic_vars)
self.uid2idx_vars = {v.uid: i for i, v in enumerate(self._all_vars)}
self._variable_parameters_values = np.zeros(len(self._variable_parameters), dtype=float)
def _compile_rhs_and_jacobian(self) -> None:
all_eqs = self._state_eqs + self._algebraic_eqs
self._rhs_fn = SymbolicVector(
all_eqs,
self._compiler_names_dict,
self._alias_names_dict,
self.VARS_NAME,
self.DIFF_NAME,
self.VARIABLE_PARAMS_NAME,
self.CONSTANT_PARAMS_NAME,
use_jit=True,
) if len(all_eqs) else None
self._jacobian_fn = SymbolicJacobian(
eqs=all_eqs,
variables=self._all_vars,
compiler_names_dict=self._compiler_names_dict,
alias_names_dict=self._alias_names_dict,
VARS_NAME=self.VARS_NAME,
DIFF_NAME=self.DIFF_NAME,
EVENT_PARAMS_NAME=self.VARIABLE_PARAMS_NAME,
PARAMS_NAME=self.CONSTANT_PARAMS_NAME,
use_jit=True,
) if len(all_eqs) else None
self._derivative_fn = SymbolicDerivative(
vars=self._all_vars,
uid2idx_vars=self.uid2idx_vars,
diff_vars=self._diff_vars,
compiler_names_dict=self._compiler_names_dict,
use_jit=True,
)
def _populate_init_guess(self) -> None:
for var, value in self.block.init_values.items():
if isinstance(var, Var) and var.uid in self.uid2idx_vars:
self.init_guess[var.uid] = float(value.value if isinstance(value, Const) else value)
uid_bindings: dict[int, float] = dict(self.init_guess)
pending = list(self.block.init_eqs.items())
for _ in range(30):
changed = False
next_pending: list[tuple[Var, Expr | Const | float]] = []
for var, expr in pending:
if not isinstance(var, Var) or var.uid not in self.uid2idx_vars:
continue
try:
if isinstance(expr, Const):
val = 0.0 if expr.value is None else float(expr.value)
elif isinstance(expr, Expr):
val = float(expr.eval_uid(uid_bindings))
else:
val = float(expr)
self.init_guess[var.uid] = val
uid_bindings[var.uid] = val
changed = True
except Exception:
next_pending.append((var, expr))
pending = next_pending
if not changed:
break
for var in self._all_vars:
self.init_guess.setdefault(var.uid, 0.0)
def _build_x0_from_init_data(self) -> np.ndarray:
x0 = np.zeros(len(self._all_vars), dtype=float)
for var in self._all_vars:
x0[self.uid2idx_vars[var.uid]] = float(self.init_guess.get(var.uid, 0.0))
return x0
[docs]
def get_all_vars_number(self) -> int:
return len(self._all_vars)
[docs]
def get_states_number(self) -> int:
return len(self._state_vars)
[docs]
def get_algebraic_var_number(self) -> int:
return len(self._algebraic_vars)
[docs]
def get_diff_var_number(self) -> int:
return len(self._diff_vars)
@property
def algebraic_vars(self):
return self._algebraic_vars
@property
def _algebraic_vars_(self):
return self._algebraic_vars
[docs]
def get_x0(self) -> np.ndarray:
return self._x0.copy()
[docs]
def update_variable_params_ts(self, x_snapshot: np.ndarray, t: float) -> None:
self.update_variable_params(t=t, x_snapshot=x_snapshot)
[docs]
def update_variable_params(self, t: float, x_snapshot: np.ndarray | None = None) -> None:
_ = x_snapshot
for i, param in enumerate(self._variable_parameters):
if i < len(self._variable_parameters_eqs):
expr = self._variable_parameters_eqs[i]
else:
expr = Const(0.0)
if param.name.startswith("input_"):
input_name = param.name.replace("input_", "", 1)
profile = self._input_profile_by_name[input_name]
self._variable_parameters_values[i] = float(profile(t) if callable(profile) else profile)
elif isinstance(expr, Const):
self._variable_parameters_values[i] = float(0.0 if expr.value is None else expr.value)
elif isinstance(expr, Expr):
try:
self._variable_parameters_values[i] = float(expr.eval_uid(dict()))
except Exception:
self._variable_parameters_values[i] = 0.0
else:
self._variable_parameters_values[i] = float(expr)
[docs]
def rhs_algebraic(self, x: np.ndarray, dx: np.ndarray) -> np.ndarray:
if self._rhs_fn is None:
return np.array([])
full = self._rhs_fn(x, dx, self._variable_parameters_values, self._constant_params)
ns = self.get_states_number()
return full[ns:]
[docs]
def rhs_state(self, x: np.ndarray, dx: np.ndarray) -> np.ndarray:
if self._rhs_fn is None or self.get_states_number() == 0:
return np.array([])
full = self._rhs_fn(x, dx, self._variable_parameters_values, self._constant_params)
ns = self.get_states_number()
return full[:ns]
[docs]
def get_dx(self, x: np.ndarray, xn: np.ndarray, dx: np.ndarray, h: float) -> np.ndarray:
return self._derivative_fn(x, xn, dx, h)
def _jac_full(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
if self._jacobian_fn is None:
return sp.csc_matrix((0, 0))
return self._jacobian_fn(x, dx, self._variable_parameters_values, self._constant_params, h).tocsc()
[docs]
def get_j11(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
ns = self.get_states_number()
if ns == 0:
return sp.csc_matrix((0, 0))
J = self._jac_full(x, dx, h)
return J[:ns, :ns]
[docs]
def get_j12(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
ns = self.get_states_number()
na = self.get_algebraic_var_number()
if ns == 0:
return sp.csc_matrix((0, na))
J = self._jac_full(x, dx, h)
return J[:ns, ns:ns + na]
[docs]
def get_j21(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
ns = self.get_states_number()
na = self.get_algebraic_var_number()
if na == 0:
return sp.csc_matrix((0, ns))
J = self._jac_full(x, dx, h)
return J[ns:ns + na, :ns]
[docs]
def get_j22(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
ns = self.get_states_number()
na = self.get_algebraic_var_number()
if na == 0:
return sp.csc_matrix((0, 0))
J = self._jac_full(x, dx, h)
return J[ns:ns + na, ns:ns + na]
[docs]
def get_var_idx(self, v: Var) -> int:
return self.uid2idx_vars[v.uid]
[docs]
def report_progress2(self, step_idx: int, steps: int) -> None:
_ = (step_idx, steps)
[docs]
def update(self, t: float, x_snapshot: np.ndarray, event_values: np.ndarray) -> None:
_ = (t, x_snapshot, event_values)
[docs]
def get_next_forced_event_time(self, t_prev: float, t_target: float):
_ = (t_prev, t_target)
return None
[docs]
def initialize_fmu_cs_devices(self, x0: np.ndarray, t0: float) -> None:
_ = (x0, t0)
[docs]
def initialize_fmu_me_devices(self, x0: np.ndarray, t0: float) -> None:
_ = (x0, t0)
[docs]
def advance_fmu_cs_devices(self, t: float, x_snapshot: np.ndarray, h: float) -> None:
_ = (t, x_snapshot, h)
[docs]
def advance_fmu_me_devices(self, t: float, x_snapshot: np.ndarray, h: float) -> None:
_ = (t, x_snapshot, h)
[docs]
def close_fmu_cs_devices(self) -> None:
return None
[docs]
def close_fmu_me_devices(self) -> None:
return None