Source code for VeraGridEngine.Simulations.Rms.initialization_rms

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from typing import Callable, Dict, List, Tuple, Union, Optional, Any
import numpy as np
import numba as nb
import math
import time
import hashlib
import os
import pickle
from pathlib import Path
import warnings
import scipy.sparse as sp
import scipy.sparse.linalg as spla

from collections import defaultdict, deque

# from VeraGridEngine.Simulations.Rms.problems import Any
from VeraGridEngine.Simulations.Rms.rms_options import RmsOptions
from VeraGridEngine.enumerations import EmtInitializationMethod, EmtInitializationStatus, RmsInitializationMethod
from VeraGridEngine.Simulations.EMT.emt_options import EmtOptions
from VeraGridEngine.Simulations.EMT.problems.emt_problem_template import EmtProblemTemplate
from VeraGridEngine.Utils.Symbolic.compiled_functions import SymbolicJacobian, SymbolicVector
from VeraGridEngine.Utils.Symbolic.jit_compiler import RMSCompiler
from VeraGridEngine.Utils.Symbolic.symbolic import expression2numba, get_expression_vars
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.Utils.Symbolic.symbolic import Var, Const, Expr, find_vars_order


InitializationStatus = EmtInitializationStatus


[docs] class EmtInitializationReport: """ Stores the outcome of the EMT-native initialization workflow. """ __slots__ = [ "status", "method_requested", "method_used", "initial_residual_inf", "final_residual_inf", "newton_iterations", "pseudo_transient_steps", "unknown_var_count", "unresolved_state_count", "automatic_dx0_count", "reduced_system_build_s", "runtime_param_build_s", "constant_param_build_s", "initial_residual_eval_s", "newton_elapsed_s", "pseudo_transient_elapsed_s", "dx0_completion_s", "x0_rebuild_s", "dx0_seed_build_s", "state_rhs_vector_build_s", "state_rhs_vector_cache_hit", "solution_apply_s", "missing_dx_problem_collect_s", "state_rhs_eval_s", "missing_dx_scatter_s", "persistent_cache_hit", "persistent_cache_load_s", "persistent_cache_store_s", "elapsed_s", "message", ] def __init__(self, method_requested: EmtInitializationMethod) -> None: """ Build an empty EMT initialization report. :param method_requested: Requested initialization workflow. :return: None """ self.status: InitializationStatus = InitializationStatus.PENDING self.method_requested: EmtInitializationMethod = method_requested self.method_used: EmtInitializationMethod = EmtInitializationMethod.Explicit self.initial_residual_inf: float = 0.0 self.final_residual_inf: float = 0.0 self.newton_iterations: int = 0 self.pseudo_transient_steps: int = 0 self.unknown_var_count: int = 0 self.unresolved_state_count: int = 0 self.automatic_dx0_count: int = 0 self.reduced_system_build_s: float = 0.0 self.runtime_param_build_s: float = 0.0 self.constant_param_build_s: float = 0.0 self.initial_residual_eval_s: float = 0.0 self.newton_elapsed_s: float = 0.0 self.pseudo_transient_elapsed_s: float = 0.0 self.dx0_completion_s: float = 0.0 self.x0_rebuild_s: float = 0.0 self.dx0_seed_build_s: float = 0.0 self.state_rhs_vector_build_s: float = 0.0 self.state_rhs_vector_cache_hit: bool = False self.solution_apply_s: float = 0.0 self.missing_dx_problem_collect_s: float = 0.0 self.state_rhs_eval_s: float = 0.0 self.missing_dx_scatter_s: float = 0.0 self.persistent_cache_hit: bool = False self.persistent_cache_load_s: float = 0.0 self.persistent_cache_store_s: float = 0.0 self.elapsed_s: float = 0.0 self.message: str = ""
[docs] class ReducedInitializationSystemCacheEntry: """ In-process cache entry for reduced EMT initialization systems. """ __slots__ = ["_residual_fn", "_jacobian_fn"] def __init__(self, residual_fn: SymbolicVector, jacobian_fn: SymbolicJacobian) -> None: """ Build one reduced-system cache entry. :param residual_fn: Reduced residual evaluator. :type residual_fn: SymbolicVector :param jacobian_fn: Reduced Jacobian evaluator. :type jacobian_fn: SymbolicJacobian :return: None. :rtype: None """ self._residual_fn = residual_fn self._jacobian_fn = jacobian_fn
[docs] def get_residual_fn(self) -> SymbolicVector: """ Return the cached reduced residual evaluator. :return: Reduced residual evaluator. :rtype: SymbolicVector """ return self._residual_fn
[docs] def get_jacobian_fn(self) -> SymbolicJacobian: """ Return the cached reduced Jacobian evaluator. :return: Reduced Jacobian evaluator. :rtype: SymbolicJacobian """ return self._jacobian_fn
[docs] class ReducedInitializationSystemCache: """ In-process cache for reduced EMT initialization systems. """ __slots__ = ["_cache"] def __init__(self) -> None: """ Build the reduced-system cache. :return: None. :rtype: None """ self._cache: Dict[str, ReducedInitializationSystemCacheEntry] = dict()
[docs] def get_entry(self, cache_key: str) -> ReducedInitializationSystemCacheEntry | None: """ Return one cached reduced-system entry. :param cache_key: Deterministic cache key. :type cache_key: str :return: Cached entry or ``None``. :rtype: ReducedInitializationSystemCacheEntry | None """ return self._cache.get(cache_key, None)
[docs] def set_entry(self, cache_key: str, entry: ReducedInitializationSystemCacheEntry) -> None: """ Store one cached reduced-system entry. :param cache_key: Deterministic cache key. :type cache_key: str :param entry: Cache entry. :type entry: ReducedInitializationSystemCacheEntry :return: None. :rtype: None """ self._cache[cache_key] = entry
REDUCED_INITIALIZATION_SYSTEM_CACHE = ReducedInitializationSystemCache()
[docs] class InitializationVectorCacheEntry: """ In-process cache entry for one initialization-stage symbolic vector. """ __slots__ = ["_vector"] def __init__(self, vector: SymbolicVector) -> None: """ Build one initialization-vector cache entry. :param vector: Cached symbolic vector evaluator. :type vector: SymbolicVector :return: None. :rtype: None """ self._vector = vector
[docs] def get_vector(self) -> SymbolicVector: """ Return the cached symbolic vector evaluator. :return: Cached symbolic vector evaluator. :rtype: SymbolicVector """ return self._vector
[docs] class InitializationVectorCache: """ In-process cache for symbolic vectors used during EMT initialization. """ __slots__ = ["_cache"] def __init__(self) -> None: """ Build the initialization-vector cache. :return: None. :rtype: None """ self._cache: Dict[str, InitializationVectorCacheEntry] = dict()
[docs] def get_entry(self, cache_key: str) -> InitializationVectorCacheEntry | None: """ Return one cached initialization-vector entry. :param cache_key: Deterministic cache key. :type cache_key: str :return: Cached entry or ``None``. :rtype: InitializationVectorCacheEntry | None """ return self._cache.get(cache_key, None)
[docs] def set_entry(self, cache_key: str, entry: InitializationVectorCacheEntry) -> None: """ Store one cached initialization-vector entry. :param cache_key: Deterministic cache key. :type cache_key: str :param entry: Cache entry. :type entry: InitializationVectorCacheEntry :return: None. :rtype: None """ self._cache[cache_key] = entry
INITIALIZATION_VECTOR_CACHE = InitializationVectorCache()
[docs] class InitializationStateRhsVector: """ Lightweight state-RHS evaluator used by ``dx0`` completion. """ __slots__ = ["_func", "_data"] def __init__(self, source_code: str, use_jit: bool, output_size: int) -> None: """ Build the state-RHS evaluator from source code. :param source_code: Generated Python source code. :type source_code: str :param use_jit: Whether the evaluator should be JIT-compiled. :type use_jit: bool :param output_size: Output vector length. :type output_size: int :return: None. :rtype: None """ namespace: Dict[str, object] = dict(math=__import__("math"), np=np) exec(source_code, namespace) if use_jit: self._func = nb.njit( nb.void(nb.float64[:], nb.float64[:], nb.float64[:], nb.float64[:], nb.float64[:]), fastmath=True, cache=False, )(namespace["func"]) else: self._func = namespace["func"] self._data = np.zeros(output_size, dtype=np.float64) def __call__(self, values: np.ndarray, diff_values: np.ndarray, event_params: np.ndarray, params: np.ndarray) -> np.ndarray: """ Evaluate the state-RHS vector. :param values: State/algebraic vector. :type values: np.ndarray :param diff_values: Differential vector. :type diff_values: np.ndarray :param event_params: Runtime parameter vector. :type event_params: np.ndarray :param params: Constant parameter vector. :type params: np.ndarray :return: Evaluated state-RHS vector. :rtype: np.ndarray """ self._func(values, diff_values, event_params, params, self._data) return self._data
def _get_state_rhs_persistent_cache_directory() -> Path: """ Return the persistent cache directory used by ``dx0`` state-RHS evaluators. :return: Persistent cache directory. :rtype: Path """ cache_directory: Path = _get_reduced_initialization_persistent_cache_directory() / "state_rhs_vectors" cache_directory.mkdir(parents=True, exist_ok=True) return cache_directory def _build_state_rhs_source( missing_state_eqs: List[Expr], compiler_names_dict: Dict[int, str], alias_names_dict: Dict[int, str], vars_name: str, diff_name: str, event_params_name: str, params_name: str, ) -> str: """ Build the Python source code of the ``dx0`` state-RHS evaluator. :param missing_state_eqs: State equations whose differential values are missing. :type missing_state_eqs: List[Expr] :param compiler_names_dict: Compiler variable-name map. :type compiler_names_dict: Dict[int, str] :param alias_names_dict: Alias variable-name map. :type alias_names_dict: Dict[int, str] :param vars_name: Full-variable argument name. :type vars_name: str :param diff_name: Differential-vector argument name. :type diff_name: str :param event_params_name: Runtime-parameter argument name. :type event_params_name: str :param params_name: Constant-parameter argument name. :type params_name: str :return: Generated Python source code. :rtype: str """ used_vars_count: Dict[int, int] = dict() final_names_dict: Dict[int, str] = compiler_names_dict.copy() lines: List[str] = list() for eq in missing_state_eqs: for var in get_expression_vars(eq): if var.uid in used_vars_count: used_vars_count[var.uid] += 1 else: used_vars_count[var.uid] = 1 lines.append(f"def func({vars_name}, {diff_name}, {event_params_name}, {params_name}, out):") for uid, count in used_vars_count.items(): if count > 1: final_names_dict[uid] = alias_names_dict[uid] lines.append(f" {alias_names_dict[uid]} = {compiler_names_dict[uid]}") else: pass for eq_index, equation in enumerate(missing_state_eqs): lines.append(f" out[{eq_index}] = {expression2numba(equation.simplify(), final_names_dict)}") return "\n".join(lines) + "\n" def _load_or_build_state_rhs_vector( problem: EmtProblemTemplate, missing_state_eqs: List[Expr], cache_key: str, report: EmtInitializationReport, ) -> InitializationStateRhsVector | SymbolicVector: """ Load or build the ``dx0`` state-RHS evaluator. :param problem: EMT problem being initialized. :type problem: EmtProblemTemplate :param missing_state_eqs: State equations whose differential values are missing. :type missing_state_eqs: List[Expr] :param cache_key: Deterministic cache key. :type cache_key: str :param report: Initialization report updated in place. :type report: EmtInitializationReport :return: State-RHS evaluator. :rtype: InitializationStateRhsVector | SymbolicVector """ cache_entry: InitializationVectorCacheEntry | None = INITIALIZATION_VECTOR_CACHE.get_entry(cache_key) cache_path: Path = _get_state_rhs_persistent_cache_directory() / f"{cache_key}.pkl" if cache_entry is None: pass else: report.state_rhs_vector_build_s = 0.0 report.state_rhs_vector_cache_hit = True return cache_entry.get_vector() phase_t0: float = time.perf_counter() if cache_path.exists(): with open(cache_path, "rb") as cache_file: payload: Dict[str, object] = dict(pickle.load(cache_file)) state_rhs_fn = InitializationStateRhsVector( source_code=str(payload["source_code"]), use_jit=bool(payload["use_jit"]), output_size=int(payload["output_size"]), ) INITIALIZATION_VECTOR_CACHE.set_entry(cache_key, InitializationVectorCacheEntry(state_rhs_fn)) report.state_rhs_vector_build_s = float(time.perf_counter() - phase_t0) report.state_rhs_vector_cache_hit = True return state_rhs_fn else: pass if len(missing_state_eqs) <= 16: use_jit_for_state_rhs: bool = False else: use_jit_for_state_rhs = True source_code: str = _build_state_rhs_source( missing_state_eqs, problem.get_compiler_names_dict(), problem.get_alias_names_dict(), problem.VARS_NAME, problem.DIFF_NAME, problem.VARIABLE_PARAMS_NAME, problem.CONSTANT_PARAMS_NAME, ) state_rhs_fn = InitializationStateRhsVector( source_code=source_code, use_jit=use_jit_for_state_rhs, output_size=len(missing_state_eqs), ) INITIALIZATION_VECTOR_CACHE.set_entry(cache_key, InitializationVectorCacheEntry(state_rhs_fn)) with open(cache_path, "wb") as cache_file: pickle.dump( dict( source_code=source_code, use_jit=use_jit_for_state_rhs, output_size=len(missing_state_eqs), ), cache_file, protocol=pickle.HIGHEST_PROTOCOL, ) report.state_rhs_vector_build_s = float(time.perf_counter() - phase_t0) report.state_rhs_vector_cache_hit = False return state_rhs_fn def _get_reduced_initialization_persistent_cache_directory() -> Path: """ Return the persistent cache directory used by reduced EMT initialization. :return: Persistent cache directory. :rtype: Path """ override_path: str = os.environ.get("VERAGRID_EMT_INIT_CACHE_DIR", "") if len(override_path) > 0: cache_directory: Path = Path(override_path) else: cache_directory = Path.home() / ".VeraGrid" / "cache" / "emt_reduced_initialization" cache_directory.mkdir(parents=True, exist_ok=True) return cache_directory def _serialize_initialization_report(report: EmtInitializationReport) -> Dict[str, object]: """ Serialize an EMT initialization report into a cache-safe dictionary. :param report: Initialization report. :type report: EmtInitializationReport :return: Serialized report. :rtype: Dict[str, object] """ return dict( status=report.status.name, method_requested=report.method_requested.name, method_used=report.method_used.name, initial_residual_inf=float(report.initial_residual_inf), final_residual_inf=float(report.final_residual_inf), newton_iterations=int(report.newton_iterations), pseudo_transient_steps=int(report.pseudo_transient_steps), unknown_var_count=int(report.unknown_var_count), unresolved_state_count=int(report.unresolved_state_count), automatic_dx0_count=int(report.automatic_dx0_count), reduced_system_build_s=float(report.reduced_system_build_s), runtime_param_build_s=float(report.runtime_param_build_s), constant_param_build_s=float(report.constant_param_build_s), initial_residual_eval_s=float(report.initial_residual_eval_s), newton_elapsed_s=float(report.newton_elapsed_s), pseudo_transient_elapsed_s=float(report.pseudo_transient_elapsed_s), dx0_completion_s=float(report.dx0_completion_s), x0_rebuild_s=float(report.x0_rebuild_s), dx0_seed_build_s=float(report.dx0_seed_build_s), state_rhs_vector_build_s=float(report.state_rhs_vector_build_s), state_rhs_vector_cache_hit=bool(report.state_rhs_vector_cache_hit), solution_apply_s=float(report.solution_apply_s), missing_dx_problem_collect_s=float(report.missing_dx_problem_collect_s), state_rhs_eval_s=float(report.state_rhs_eval_s), missing_dx_scatter_s=float(report.missing_dx_scatter_s), persistent_cache_hit=bool(report.persistent_cache_hit), persistent_cache_load_s=float(report.persistent_cache_load_s), persistent_cache_store_s=float(report.persistent_cache_store_s), elapsed_s=float(report.elapsed_s), message=str(report.message), ) def _serialize_problem_guess_map(problem: EmtProblemTemplate, guess_map: Dict[int, float], diff_guess: bool) -> Dict[str, float]: """ Serialize an initialization guess map using stable variable names. :param problem: EMT problem instance. :type problem: EmtProblemTemplate :param guess_map: UID-keyed initialization guess map. :type guess_map: Dict[int, float] :param diff_guess: Whether the map contains differential-variable guesses. :type diff_guess: bool :return: Name-keyed initialization guess map. :rtype: Dict[str, float] """ uid_to_name: Dict[int, str] = dict() serialized: Dict[str, float] = dict() alias_name_map: Dict[int, str] = problem.get_alias_names_dict() if diff_guess: for diff_var in problem.get_diff_vars(): uid_to_name[diff_var.uid] = alias_name_map[diff_var.uid] else: for var in problem.get_state_vars(): uid_to_name[var.uid] = alias_name_map[var.uid] for var in problem.get_algebraic_vars(): uid_to_name[var.uid] = alias_name_map[var.uid] for uid, value in guess_map.items(): if uid in uid_to_name: serialized[uid_to_name[uid]] = float(value) else: pass return serialized def _restore_problem_guess_map(problem: Any, name_map: Dict[str, float], diff_guess: bool) -> Dict[int, float]: """ Restore an initialization guess map from stable variable names. :param problem: EMT problem instance. :type problem: EmtProblemTemplate :param name_map: Name-keyed initialization guess map. :type name_map: Dict[str, float] :param diff_guess: Whether the map contains differential-variable guesses. :type diff_guess: bool :return: UID-keyed initialization guess map. :rtype: Dict[int, float] """ name_to_uid: Dict[str, int] = dict() restored: Dict[int, float] = dict() alias_name_map: Dict[int, str] = problem.get_alias_names_dict() if diff_guess: for diff_var in problem.get_diff_vars(): name_to_uid[alias_name_map[diff_var.uid]] = diff_var.uid else: for var in problem.state_vars(): name_to_uid[alias_name_map[var.uid]] = var.uid for var in problem.algebraic_vars(): name_to_uid[alias_name_map[var.uid]] = var.uid for name, value in name_map.items(): if name in name_to_uid: restored[name_to_uid[name]] = float(value) else: pass return restored def _restore_initialization_report_from_payload( report: EmtInitializationReport, payload: Dict[str, object], ) -> None: """ Restore an EMT initialization report from a serialized payload. :param report: Initialization report updated in place. :type report: EmtInitializationReport :param payload: Serialized report payload. :type payload: Dict[str, object] :return: None. :rtype: None """ report.status = InitializationStatus[str(payload["status"])] report.method_requested = EmtInitializationMethod[str(payload["method_requested"])] report.method_used = EmtInitializationMethod[str(payload["method_used"])] report.initial_residual_inf = float(payload["initial_residual_inf"]) report.final_residual_inf = float(payload["final_residual_inf"]) report.newton_iterations = int(payload["newton_iterations"]) report.pseudo_transient_steps = int(payload["pseudo_transient_steps"]) report.unknown_var_count = int(payload["unknown_var_count"]) report.unresolved_state_count = int(payload["unresolved_state_count"]) report.automatic_dx0_count = int(payload["automatic_dx0_count"]) report.reduced_system_build_s = float(payload["reduced_system_build_s"]) report.runtime_param_build_s = float(payload["runtime_param_build_s"]) report.constant_param_build_s = float(payload["constant_param_build_s"]) report.initial_residual_eval_s = float(payload["initial_residual_eval_s"]) report.newton_elapsed_s = float(payload["newton_elapsed_s"]) report.pseudo_transient_elapsed_s = float(payload["pseudo_transient_elapsed_s"]) report.dx0_completion_s = float(payload["dx0_completion_s"]) report.x0_rebuild_s = float(payload.get("x0_rebuild_s", 0.0)) report.dx0_seed_build_s = float(payload.get("dx0_seed_build_s", 0.0)) report.state_rhs_vector_build_s = float(payload.get("state_rhs_vector_build_s", 0.0)) report.state_rhs_vector_cache_hit = bool(payload.get("state_rhs_vector_cache_hit", False)) report.solution_apply_s = float(payload.get("solution_apply_s", 0.0)) report.missing_dx_problem_collect_s = float(payload.get("missing_dx_problem_collect_s", 0.0)) report.state_rhs_eval_s = float(payload.get("state_rhs_eval_s", 0.0)) report.missing_dx_scatter_s = float(payload.get("missing_dx_scatter_s", 0.0)) report.persistent_cache_hit = bool(payload.get("persistent_cache_hit", False)) report.persistent_cache_load_s = float(payload.get("persistent_cache_load_s", 0.0)) report.persistent_cache_store_s = float(payload.get("persistent_cache_store_s", 0.0)) report.elapsed_s = float(payload["elapsed_s"]) report.message = str(payload["message"])
[docs] def build_persistent_initialization_cache_key( problem: Any, unknown_vars: List[Var], residual_eqs: List[Expr], state_unknown_mask: np.ndarray, options: RmsOptions, ) -> str: """ Return the deterministic cache key of one persistent reduced initialization result. :param problem: EMT problem being initialized. :type problem: EmtProblemTemplate :param unknown_vars: Unknown variables in the reduced system. :type unknown_vars: List[Var] :param residual_eqs: Residual equations in the reduced system. :type residual_eqs: List[Expr] :param state_unknown_mask: Reduced state mask. :type state_unknown_mask: np.ndarray :param options: EMT initialization options. :type options: EmtOptions :return: Deterministic cache key. :rtype: str """ reduced_key: str = build_reduced_initialization_cache_key(problem, unknown_vars, residual_eqs, state_unknown_mask) payload: str = "|".join([ "persistent-reduced-emt-init-v4", reduced_key, options.initialization_method.name, f"{float(options.init_newton_tol):.12g}", str(int(options.init_newton_max_iter)), str(int(options.init_dense_threshold)), str(bool(options.init_newton_backtracking)), str(bool(options.init_allow_state_equilibrium)), f"{float(options.init_ptc_dtau0):.12g}", f"{float(options.init_ptc_dtau_min):.12g}", f"{float(options.init_ptc_dtau_max):.12g}", str(int(options.init_ptc_max_iter)), ]) return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _persistent_initialization_params_match( payload: Dict[str, object], runtime_params: np.ndarray, constant_params: np.ndarray, ) -> bool: """ Return whether one persistent initialization payload matches the current parameter snapshots. :param payload: Cached payload. :type payload: Dict[str, object] :param runtime_params: Runtime parameter vector. :type runtime_params: np.ndarray :param constant_params: Constant parameter vector. :type constant_params: np.ndarray :return: ``True`` when the parameter snapshots match. :rtype: bool """ runtime_snapshot_any: object = payload.get("runtime_params_snapshot", list()) constant_snapshot_any: object = payload.get("constant_params_snapshot", list()) if isinstance(runtime_snapshot_any, list): runtime_snapshot = np.asarray(runtime_snapshot_any, dtype=np.float64) else: return False if isinstance(constant_snapshot_any, list): constant_snapshot = np.asarray(constant_snapshot_any, dtype=np.float64) else: return False if runtime_snapshot.shape != runtime_params.shape: return False else: pass if constant_snapshot.shape != constant_params.shape: return False else: pass if runtime_snapshot.size > 0: if float(np.max(np.abs(runtime_snapshot - runtime_params))) > 1.0e-12: return False else: pass else: pass if constant_snapshot.size > 0: if float(np.max(np.abs(constant_snapshot - constant_params))) > 1.0e-12: return False else: pass else: pass return True
[docs] def load_persistent_initialization_solution(cache_key: str) -> Dict[str, object] | None: """ Load one persistent reduced initialization result. :param cache_key: Deterministic cache key. :type cache_key: str :return: Cached payload or ``None``. :rtype: Dict[str, object] | None """ cache_path: Path = _get_reduced_initialization_persistent_cache_directory() / f"{cache_key}.pkl" if cache_path.exists(): with open(cache_path, "rb") as cache_file: return dict(pickle.load(cache_file)) else: return None
def _store_persistent_initialization_solution(cache_key: str, payload: Dict[str, object]) -> None: """ Store one persistent reduced initialization result. :param cache_key: Deterministic cache key. :type cache_key: str :param payload: Cache payload. :type payload: Dict[str, object] :return: None. :rtype: None """ cache_path: Path = _get_reduced_initialization_persistent_cache_directory() / f"{cache_key}.pkl" with open(cache_path, "wb") as cache_file: pickle.dump(payload, cache_file, protocol=pickle.HIGHEST_PROTOCOL)
[docs] def build_reduced_initialization_cache_key( problem: Any, unknown_vars: List[Var], residual_eqs: List[Expr], state_unknown_mask: np.ndarray, ) -> str: """ Return a deterministic cache key for one reduced Rms initialization system. :param problem: EMT problem being initialized. :type problem: EmtProblemTemplate :param unknown_vars: Unknown variables solved by the reduced system. :type unknown_vars: List[Var] :param residual_eqs: Residual equations enforcing consistency. :type residual_eqs: List[Expr] :param state_unknown_mask: State-mask vector. :type state_unknown_mask: np.ndarray :return: Deterministic cache key. :rtype: str """ unknown_var_tokens: List[str] = list() residual_tokens: List[str] = list() mask_tokens: List[str] = list() index: int = 0 while index < len(unknown_vars): unknown_var_tokens.append(unknown_vars[index].name) index += 1 index = 0 while index < len(residual_eqs): residual_tokens.append(expression2numba(residual_eqs[index].simplify(), problem.get_compiler_names_dict())) index += 1 index = 0 while index < state_unknown_mask.shape[0]: mask_tokens.append(str(float(state_unknown_mask[index]))) index += 1 payload: str = "|".join([ "reduced-emt-init", problem.VARS_NAME, problem.DIFF_NAME, problem.VARIABLE_PARAMS_NAME, problem.CONSTANT_PARAMS_NAME, str(problem.get_states_number()), str(problem.get_algebraic_var_number()), str(len(unknown_vars)), str(len(residual_eqs)), "::".join(unknown_var_tokens), "::".join(mask_tokens), "::".join(residual_tokens), ]) return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _build_state_rhs_cache_key(problem: EmtProblemTemplate, state_eqs: List[Expr]) -> str: """ Return the cache key for the symbolic vector used to compute missing ``dx0``. :param problem: EMT problem being initialized. :type problem: EmtProblemTemplate :param state_eqs: State equations. :type state_eqs: List[Expr] :return: Deterministic cache key. :rtype: str """ expr_tokens: List[str] = list() index: int = 0 while index < len(state_eqs): expr_tokens.append(expression2numba(state_eqs[index].simplify(), problem.get_compiler_names_dict())) index += 1 payload: str = "|".join([ "state-rhs-vector", problem.VARS_NAME, problem.DIFF_NAME, problem.VARIABLE_PARAMS_NAME, problem.CONSTANT_PARAMS_NAME, str(problem.get_states_number()), "::".join(expr_tokens), ]) return hashlib.sha256(payload.encode("utf-8")).hexdigest() def _collect_missing_dx_problem(problem: EmtProblemTemplate) -> Tuple[List[Var], List[Expr]]: """ Return the differential variables that still need ``dx0`` and their state equations. :param problem: EMT problem being initialized. :type problem: EmtProblemTemplate :return: Pair ``(missing_diff_vars, missing_state_eqs)``. :rtype: Tuple[List[Var], List[Expr]] """ diff_vars: List[Var] = problem.get_diff_vars() state_eqs: List[Expr] = problem.get_state_eqs() missing_diff_vars: List[Var] = list() missing_state_eqs: List[Expr] = list() idx: int = 0 while idx < len(diff_vars): diff_var: Var = diff_vars[idx] if diff_var.uid in problem.diff_init_guess: diff_var = diff_var else: missing_diff_vars.append(diff_var) missing_state_eqs.append(state_eqs[idx]) idx += 1 return missing_diff_vars, missing_state_eqs
[docs] def build_uid_bindings( eq: Union[Expr, Const], event_params_array: np.ndarray, x: np.ndarray, params_array: np.ndarray, dx: np.ndarray, uid2idx_event_params: Dict[int, int], uid2idx_vars: Dict[int, int], uid2idx_params: Dict[int, int], uid2idx_diff: Dict[int, int] ) -> Dict[int, float]: """ Builds a mapping of Unique Identifiers (UIDs) to their current numeric values. :param eq: The symbolic expression to analyze. :param event_params_array: Array containing resolved event parameters. :param x: Array containing state and algebraic variables. :param params_array: Array containing constant model parameters. :param dx: Array containing derivative values. :param uid2idx_event_params: Mapping of UID to index for event parameters. :param uid2idx_vars: Mapping of UID to index for variables. :param uid2idx_params: Mapping of UID to index for constants. :param uid2idx_diff: Mapping of UID to index for derivatives. :return: A dictionary mapping UIDs to their float values. """ uid_bindings: Dict[int, float] = dict() vars_list: List[Var] = find_vars_order(eq) for vr in vars_list: resolved_value: Optional[float] = None event_idx: Optional[int] = uid2idx_event_params.get(vr.uid, None) state_idx: Optional[int] = uid2idx_vars.get(vr.uid, None) param_idx: Optional[int] = uid2idx_params.get(vr.uid, None) diff_idx: Optional[int] = uid2idx_diff.get(vr.uid, None) if event_idx is None: if state_idx is None: if param_idx is None: if diff_idx is None: resolved_value = None else: resolved_value = float(dx[diff_idx]) else: resolved_value = float(params_array[param_idx]) else: resolved_value = float(x[state_idx]) else: resolved_value = float(event_params_array[event_idx]) if resolved_value is None: uid_bindings = uid_bindings else: uid_bindings[vr.uid] = resolved_value return uid_bindings
[docs] def event_param_is_resolved(v: Var, mdl: Block) -> bool: """ Checks if an event parameter has been assigned a concrete value. :param v: The variable representing the event parameter. :param mdl: The symbolic model block. :return: True if resolved, False otherwise. """ if v in mdl.event_dict: eqv: Union[Expr, Const] = mdl.event_dict[v] if isinstance(eqv, Const): if eqv.value is None: return False else: return True else: return True else: return True
[docs] def can_compute_init( var: Var, eq: Union[Expr, Const], mdl: Block, init_guess: Dict[int, float], diff_init_guess: Dict[int, float], uid2idx_diff: Dict[int, int], uid2idx_vars: Dict[int, int] ) -> bool: """ Determines if an initialization equation is computable based on current knowns. :param var: Target variable. :param eq: The expression to evaluate. :param mdl: The model block. :param init_guess: Current variable guesses. :param diff_init_guess: Current derivative guesses. :param uid2idx_diff: UID map for derivatives. :param uid2idx_vars: UID map for variables. :return: Boolean indicating if all dependencies are met. """ vars_needed: List[Var] = get_expression_vars(eq) for v_dep in vars_needed: if v_dep.uid == var.uid: dependency_ready: bool = True elif v_dep in mdl.event_dict: dependency_ready = event_param_is_resolved(v_dep, mdl) elif v_dep.uid in uid2idx_diff: dependency_ready = v_dep.uid in diff_init_guess elif v_dep.uid in uid2idx_vars: dependency_ready = v_dep.uid in init_guess else: dependency_ready = True if dependency_ready: dependency_ready = dependency_ready else: return False return True
[docs] def init_explicit_emt( mdl: Block, sys_vars: Dict[int, Var], sys_diff_vars: Dict[int, Var], variable_parameters: List[Var], event_parameters_eqs: List[Union[Expr, Const]], constant_parameters: List[Var], init_guess: Dict[int, float], diff_init_guess: Dict[int, float], uid2idx_vars: Dict[int, int], uid2idx_diff: Dict[int, int], uid2idx_params: Dict[int, int], uid2idx_event_params: Dict[int, int], compiler_names_dict: Dict[int, str], alias_names_dict: Dict[int, str], VARIABLE_PARAMS_NAME: str, VARS_NAME: str, DIFF_NAME: str, CONSTANT_PARAMS_NAME: str, verbose: bool = False) -> Optional[Tuple[Dict[int, float], Dict[int, float]]]: """ Initializes the EMT model by solving initialization and differential equations. This function performs a topological-like resolution of model variables. It uses fixed-point iteration for self-referencing equations and handles dependencies between algebraic and differential states. :param mdl: Symbolic model block. :param sys_vars: Registry of system variables. :param sys_diff_vars: Registry of system differential variables. :param variable_parameters: List of parameters that change on events. :param event_parameters_eqs: Initial equations for event parameters. :param constant_parameters: List of constant parameters. :param init_guess: Initial guesses for variables. :param diff_init_guess: Initial guesses for derivatives. :param uid2idx_vars: Index map for variables. :param uid2idx_diff: Index map for derivatives. :param uid2idx_params: Index map for parameters. :param uid2idx_event_params: Index map for event parameters. :param compiler_names_dict: Map for variable naming in compilation. :param alias_names_dict: Map for variable aliases. :param VARIABLE_PARAMS_NAME: Name for variable parameter array. :param VARS_NAME: Name for state variable array. :param DIFF_NAME: Name for derivative array. :param CONSTANT_PARAMS_NAME: Name for constant parameter array. :param verbose: Whether to print explicit initialization diagnostics. :return: Updated initial guesses and modified model block. """ # Initialize memory buffers for numerical evaluation # EMT solvers require flat arrays for efficient computation x: np.ndarray = np.ones(len(sys_vars)) dx: np.ndarray = np.zeros(len(sys_diff_vars)) # Hydrate buffers with existing user-provided guesses for uid, val in init_guess.items(): if verbose: print(f"Var {sys_vars[uid].name} with uid= {uid} initialized with val: {val} ") x[uid2idx_vars[uid]] = val for uid, val in diff_init_guess.items(): if verbose: print(f"Diff var {sys_diff_vars[uid].name} with uid= {uid} initialized with val: {val} ") dx[uid2idx_diff[uid]] = val # Setup constant parameter array params_array: np.ndarray = np.zeros(len(constant_parameters)) for param, const in mdl.parameters.items(): params_array[uid2idx_params[param.uid]] = const.value # Primary pass: resolve basic event parameters that are constant or immediate event_params_array: np.ndarray = np.ones(len(variable_parameters)) for event_param in mdl.event_dict.keys(): eq: Union[Expr, Const] = mdl.event_dict[event_param] # Determine if the expression is concrete enough to evaluate is_concrete_const: bool = isinstance(eq, Const) and eq.value is not None is_expression: bool = not isinstance(eq, Const) if is_concrete_const or is_expression: vars_list: List[Var] = find_vars_order(eq) # Bind current values for calculation uid_bindings: Dict[int, float] = dict() for var in vars_list: uid_bindings[var.uid] = float(event_params_array[uid2idx_event_params[var.uid]]) result: float = eq.eval_uid(uid_bindings) event_params_array[uid2idx_event_params[event_param.uid]] = result if verbose: print(f"event_param:{event_param.name} initialized from events_dict with: {result}") else: event_param = event_param # Track pending equations that require dependency resolution init_pending: Dict[int, Tuple[Var, Union[Expr, Const]]] = dict() for v, e in mdl.init_eqs.items(): init_pending[v.uid] = (v, e) diff_pending: Dict[int, Tuple[Var, Union[Expr, Const]]] = dict() for v, e in mdl.diff_init_eqs.items(): diff_pending[v.uid] = (v, e) # Iterative solver loop while len(init_pending) > 0 or len(diff_pending) > 0: progress_made: bool = False # Solve for Algebraic/State variables for uid in list(init_pending.keys()): var, eq = init_pending[uid] if can_compute_init(var, eq, mdl, init_guess, diff_init_guess, uid2idx_diff, uid2idx_vars): # Logic for variables that are registered as event parameters if var in mdl.event_dict: if isinstance(eq, Const) and eq.value is not None: event_params_array[uid2idx_event_params[var.uid]] = float(eq.value) else: bindings: Dict[int, float] = build_uid_bindings( eq, event_params_array, x, params_array, dx, uid2idx_event_params, uid2idx_vars, uid2idx_params, uid2idx_diff ) res: float = eq.eval_uid(bindings) mdl.event_dict[var] = Const(res) event_params_array[uid2idx_event_params[var.uid]] = res if verbose: print(f"event_param:{var.name} initialized from init_eqs with: {res}") else: pass param_idx: int = variable_parameters.index(var) event_parameters_eqs[param_idx].value = float(res) event_params_array[param_idx] = float(res) # Logic for standard algebraic variables else: vars_in_eqs: List[Var] = get_expression_vars(eq) if var not in vars_in_eqs: # Direct assignment bindings = build_uid_bindings( eq, event_params_array, x, params_array, dx, uid2idx_event_params, uid2idx_vars, uid2idx_params, uid2idx_diff ) res = eq.eval_uid(bindings) init_guess[var.uid] = res x[uid2idx_vars[var.uid]] = res else: # Implicit resolution using a damped fixed-point iteration eq_fn: SymbolicVector = SymbolicVector( [eq], compiler_names_dict, alias_names_dict, VARS_NAME, DIFF_NAME, VARIABLE_PARAMS_NAME, CONSTANT_PARAMS_NAME ) # Fixed-point parameters tol: float = 1e-8 error: float = 1.0 alpha: float = 1.3 curr_x: float = 1.2 x[uid2idx_vars[var.uid]] = curr_x while error > tol: try: # Evaluate expression with current state eval_res: float = float(eq_fn(x, dx, event_params_array, params_array)[0]) except Exception: # Fallback/Reset on numerical instability x[uid2idx_vars[var.uid]] = -0.1 eval_res = float(eq_fn(x, dx, event_params_array, params_array)[0]) # Update with damping (Successive Over-Relaxation style) x[uid2idx_vars[var.uid]] = (1 - alpha) * curr_x + alpha * eval_res error = float(abs(eval_res - curr_x)) curr_x = float(x[uid2idx_vars[var.uid]]) init_guess[var.uid] = curr_x del init_pending[uid] progress_made = True else: uid = uid # Solve for Differential variables for uid in list(diff_pending.keys()): d_var, eq = diff_pending[uid] # Use separate check for diff equations to ensure variables are ready if can_compute_init(d_var, eq, mdl, init_guess, diff_init_guess, uid2idx_diff, uid2idx_vars): vars_in_eqs = get_expression_vars(eq) if d_var not in vars_in_eqs: bindings = build_uid_bindings( eq, event_params_array, x, params_array, dx, uid2idx_event_params, uid2idx_vars, uid2idx_params, uid2idx_diff ) res = eq.eval_uid(bindings) diff_init_guess[d_var.uid] = res dx[uid2idx_diff[d_var.uid]] = res else: # Fixed-point iteration for derivatives eq_fn = SymbolicVector( [eq], compiler_names_dict, alias_names_dict, VARS_NAME, DIFF_NAME, VARIABLE_PARAMS_NAME, CONSTANT_PARAMS_NAME ) tol = 1e-8 error = 1.0 alpha = 1.3 curr_dx: float = 1.2 dx[uid2idx_diff[d_var.uid]] = curr_dx while error > tol: try: eval_res = float(eq_fn(x, dx, event_params_array, params_array)[0]) except Exception: dx[uid2idx_diff[d_var.uid]] = -0.1 eval_res = float(eq_fn(x, dx, event_params_array, params_array)[0]) dx[uid2idx_diff[d_var.uid]] = (1 - alpha) * curr_dx + alpha * eval_res error = float(abs(eval_res - curr_dx)) curr_dx = float(dx[uid2idx_diff[d_var.uid]]) diff_init_guess[d_var.uid] = curr_dx del diff_pending[uid] progress_made = True else: uid = uid # If no variables were resolved in this pass, the system is unsolvable if not progress_made: # We return None to signify failure rather than raising, per directive return None else: progress_made = progress_made return init_guess, diff_init_guess
[docs] def init_pseudo_transient(problem: EmtProblemTemplate, options: EmtOptions) -> EmtInitializationReport: """ Run the EMT-native initialization workflow forcing the pseudo-transient path. :param problem: EMT problem being initialized. :param options: EMT simulation options. :return: Initialization report. """ options.initialization_method = EmtInitializationMethod.PseudoTransient return run_rms_native_initialization(problem, options)
[docs] def run_emt_explicit_initialization(problem: EmtProblemTemplate, verbose: bool = False) -> None: """ Run the explicit EMT initialization stage on a full problem object. :param problem: EMT problem being initialized. :param verbose: Print explicit initialization details. :return: None """ sys_vars: Dict[int, Var] = {v.uid: v for v in (problem.get_state_vars() + problem.get_algebraic_vars())} sys_diff_vars: Dict[int, Var] = {dv.uid: dv for dv in problem.get_diff_vars()} init_explicit_emt( mdl=problem.sys_block, sys_vars=sys_vars, sys_diff_vars=sys_diff_vars, variable_parameters=problem.get_variable_parameters(), event_parameters_eqs=problem.get_event_parameter_equations(), constant_parameters=problem.get_constant_parameters(), init_guess=problem.init_guess, diff_init_guess=problem.diff_init_guess, uid2idx_vars=problem.uid2idx_vars, uid2idx_diff=problem.uid2idx_diff, uid2idx_params=problem.uid2idx_params, uid2idx_event_params=problem.uid2idx_event_params, compiler_names_dict=problem.get_compiler_names_dict(), alias_names_dict=problem.get_alias_names_dict(), VARIABLE_PARAMS_NAME=problem.VARIABLE_PARAMS_NAME, VARS_NAME=problem.VARS_NAME, DIFF_NAME=problem.DIFF_NAME, CONSTANT_PARAMS_NAME=problem.CONSTANT_PARAMS_NAME, verbose=verbose, ) problem.rebuild_runtime_param_vectors()
[docs] class ReducedInitializationSystem: """ Reduced RMS initialization system built on top of the unresolved variables only. """ __slots__ = [ "unknowns", "unknown_vars", "unknown_events", "residual_eqs", "state_unknown_mask", "residual_fn", "jacobian_fn", ] def __init__( self, problem: Any, unknowns: List[Var], unknown_vars: List[Var], unknown_events: List[Var], residual_eqs: List[Expr], state_unknown_mask: np.ndarray, ) -> None: """ Compile the reduced residual and Jacobian used by the consistent initializer. :param problem: Rms problem being initialized. :param unknown_vars: Unknown variables solved by the reduced system. :param residual_eqs: Residual equations enforcing consistency. :param state_unknown_mask: Mask marking which reduced unknowns correspond to states. :return: None """ self.unknowns = unknowns self.unknown_vars = unknown_vars self.unknown_events = unknown_events self.residual_eqs = residual_eqs self.state_unknown_mask = state_unknown_mask self.residual_fn = SymbolicVector( eqs=residual_eqs, compiler_names_dict=problem.get_compiler_names_dict(), alias_names_dict=problem.get_alias_names_dict(), VARS_NAME=problem.VARS_NAME, DIFF_NAME=problem.DIFF_NAME, EVENT_PARAMS_NAME=problem.VARIABLE_PARAMS_NAME, PARAMS_NAME=problem.CONSTANT_PARAMS_NAME, ) self.jacobian_fn = SymbolicJacobian( eqs=residual_eqs, variables=unknowns, compiler_names_dict=problem.get_compiler_names_dict(), alias_names_dict=problem.get_alias_names_dict(), VARS_NAME=problem.VARS_NAME, DIFF_NAME=problem.DIFF_NAME, EVENT_PARAMS_NAME=problem.VARIABLE_PARAMS_NAME, PARAMS_NAME=problem.CONSTANT_PARAMS_NAME, )
[docs] def build_constant_param_array(problem: Any) -> np.ndarray: """ Build the dense constant-parameter vector used by the initialization solvers. :param problem: EMT problem being initialized. :return: Constant-parameter array. """ const_values: List[Const] = problem.get_parameters_values() return np.asarray([float(item.value) for item in const_values], dtype=np.float64)
[docs] def build_runtime_param_array(problem: Any) -> np.ndarray: """ Build the runtime-parameter snapshot at the initialization time. :param problem: EMT problem being initialized. :return: Runtime-parameter array at time zero. """ runtime_params: np.ndarray = problem.event_params_values.copy() runtime_params = problem.def_event_params_fn(runtime_params, 0.0) return runtime_params
[docs] def build_state_equation_lookup(problem: Any) -> Dict[int, Expr]: """ Build a UID lookup for state equations. :param problem: EMT problem being initialized. :return: Mapping from state variable UID to its differential right-hand side. """ lookup: Dict[int, Expr] = dict() state_vars: List[Var] = problem.state_vars state_eqs: List[Expr] = problem.state_eqs idx: int = 0 while idx < len(state_vars): lookup[state_vars[idx].uid] = state_eqs[idx] idx += 1 return lookup
[docs] def extract_unknown_vector(problem: Any, unknown_vars: List[Var], x: np.ndarray) -> np.ndarray: """ Extract the reduced unknown vector from the full EMT variable vector. :param problem: Rms problem being initialized. :param unknown_vars: Unknown variables solved by the reduced system. :param x: Full EMT variable vector. :return: Reduced unknown vector. """ n_unknown: int = len(unknown_vars) out: np.ndarray = np.zeros(n_unknown, dtype=np.float64) idx: int = 0 while idx < n_unknown: var: Var = unknown_vars[idx] out[idx] = float(x[problem.uid2idx_vars[var.uid]]) idx += 1 return out
[docs] def extract_unknown_events_vector(problem: Any, unknown_events: List[Var], event_x: np.ndarray) -> np.ndarray: """ Extract the reduced unknown vector from the full EMT variable vector. :param problem: Rms problem being initialized. :param unknown_vars: Unknown variables solved by the reduced system. :param x: Full EMT variable vector. :return: Reduced unknown vector. """ n_unknown: int = len(unknown_events) out: np.ndarray = np.zeros(n_unknown, dtype=np.float64) idx: int = 0 while idx < n_unknown: var: Var = unknown_events[idx] out[idx] = float(event_x[problem.uid2idx_event_params[var.uid]]) idx += 1 return out
[docs] def scatter_unknown_vector(problem: Any, unknown_vars: List[Var], x: np.ndarray, reduced_x: np.ndarray) -> None: """ Scatter the reduced unknown vector back into the full Rms variable vector. :param problem: EMT problem being initialized. :param unknown_vars: Unknown variables solved by the reduced system. :param x: Full EMT variable vector updated in place. :param reduced_x: Reduced unknown vector. :return: None """ idx: int = 0 while idx < len(unknown_vars): var: Var = unknown_vars[idx] x[problem.uid2idx_vars[var.uid]] = float(reduced_x[idx]) idx += 1
[docs] def scatter_unknown_event_vector(problem: Any, unknown_events: List[Var], event_x: np.ndarray, reduced_event_x: np.ndarray) -> None: """ Scatter the reduced unknown vector back into the full Rms variable vector. :param problem: EMT problem being initialized. :param unknown_vars: Unknown variables solved by the reduced system. :param x: Full EMT variable vector updated in place. :param reduced_x: Reduced unknown vector. :return: None """ idx: int = 0 while idx < len(unknown_events): var: Var = unknown_events[idx] event_x[problem.uid2idx_event_params[var.uid]] = float(reduced_event_x[idx]) idx += 1
[docs] def build_reduced_initialization_system( problem: Any, allow_state_equilibrium: bool, ) -> ReducedInitializationSystem | None: """ Build the reduced initialization system for unresolved EMT variables. The reduced system always enforces all algebraic equations. In addition, any state variable still missing after the explicit stage receives one extra consistency equation, either from `init_eqs` or from a steady-state residual. :param problem: EMT problem being initialized. :param allow_state_equilibrium: Whether unresolved states may use f(x,z,p)=0. :return: Reduced initialization system or None if nothing remains to solve. """ reduced_problem = collect_reduced_initialization_problem(problem, allow_state_equilibrium) if reduced_problem is None: return None else: unknowns, unknown_vars, unknown_events, residual_eqs, mask_array = reduced_problem return ReducedInitializationSystem( problem=problem, unknowns = unknowns, unknown_vars=unknown_vars, unknown_events=unknown_events, residual_eqs=residual_eqs, state_unknown_mask=mask_array, )
[docs] def collect_reduced_initialization_problem( problem: Any, allow_state_equilibrium: bool, ) -> Tuple[List[Var], List[Var], List[Var], List[Expr], np.ndarray] | None: """ Collect the reduced initialization unknowns and residual equations without compiling them. :param problem: EMT problem being initialized. :type problem: EmtProblemTemplate :param allow_state_equilibrium: Whether unresolved states may use ``f(x,z,p)=0``. :type allow_state_equilibrium: bool :return: Tuple ``(unknown_vars, residual_eqs, state_unknown_mask)`` or ``None``. :rtype: Tuple[List[Var], List[Expr], np.ndarray] | None """ unknowns: List[Var] = list() unknown_vars: List[Var] = list() unknown_events: List[Var] = list() residual_eqs: List[Expr] = list() state_unknown_flags: List[float] = list() state_eq_lookup: Dict[int, Expr] = build_state_equation_lookup(problem) init_eq_lookup: Dict[Var, Expr] = problem.sys_block.init_eqs algebraic_vars: List[Var] = problem.algebraic_vars algebraic_eqs: List[Expr] = problem.algebraic_eqs variable_parameters: List[Var] = problem.variable_parameters event_params_eqs: List[Expr] = problem.event_parameters_eqs alg_idx: int = 0 evt_params_idx: int = 0 # Todo: Improve logic for state_var in problem.state_vars: if state_var.uid not in problem.init_guess: unknown_vars.append(state_var) state_unknown_flags.append(1.0) if state_var in init_eq_lookup: residual_eqs.append(state_var - init_eq_lookup[state_var]) elif allow_state_equilibrium and state_var.uid in state_eq_lookup: residual_eqs.append(state_eq_lookup[state_var.uid]) else: residual_eqs.append(state_var) while alg_idx < len(algebraic_vars): algebraic_var: Var = algebraic_vars[alg_idx] if algebraic_var.uid not in problem.init_guess: unknown_vars.append(algebraic_var) unknowns.append(algebraic_var) residual_eqs.append(algebraic_eqs[alg_idx]) state_unknown_flags.append(0.0) alg_idx += 1 while evt_params_idx < len(variable_parameters): evnt_param: Var = variable_parameters[evt_params_idx] if evnt_param.uid not in problem.event_params_init_dict: unknown_events.append(evnt_param) unknowns.append(evnt_param) residual_eqs.append(event_params_eqs[evt_params_idx]) state_unknown_flags.append(0.0) evt_params_idx += 1 if len(unknown_vars) == 0: return None else: return unknowns, unknown_vars, unknown_events, residual_eqs, np.asarray(state_unknown_flags, dtype=np.float64)
[docs] def evaluate_initialization_residual( reduced_system: ReducedInitializationSystem, problem: Any, x: np.ndarray, dx: np.ndarray, runtime_params: np.ndarray, constant_params: np.ndarray, ) -> np.ndarray: # Todo: this function can be eliminated and calculated directly """ Evaluate the reduced initialization residual. :param reduced_system: Reduced initialization system. :param problem: EMT problem being initialized. :param x: Full EMT variable vector. :param dx: Full EMT differential vector. :param runtime_params: Runtime-parameter vector. :param constant_params: Constant-parameter vector. :return: Reduced residual vector. """ _unused = problem return reduced_system.residual_fn(x, dx, runtime_params, constant_params).copy()
[docs] def evaluate_initialization_jacobian( reduced_system: ReducedInitializationSystem, problem: Any, x: np.ndarray, dx: np.ndarray, runtime_params: np.ndarray, constant_params: np.ndarray, ) -> sp.csc_matrix: # Todo: this function can be eliminated and calculated directly """ Evaluate the reduced initialization Jacobian. :param reduced_system: Reduced initialization system. :param problem: EMT problem being initialized. :param x: Full EMT variable vector. :param dx: Full EMT differential vector. :param runtime_params: Runtime-parameter vector. :param constant_params: Constant-parameter vector. :return: Reduced sparse Jacobian. """ _unused = problem return reduced_system.jacobian_fn(x, dx, runtime_params, constant_params, 1.0).tocsc(copy=True)
def _max_abs_value(values: np.ndarray) -> float: """ Return the infinity norm of one dense vector. :param values: Dense vector. :return: Maximum absolute value. """ if values.size == 0: return 0.0 else: return float(np.max(np.abs(values)))
[docs] def solve_reduced_linear_system( matrix: sp.csc_matrix, rhs: np.ndarray, dense_threshold: int, ) -> np.ndarray: """ Solve one reduced linear system using a dense or sparse backend depending on size. :param matrix: Reduced Jacobian-like matrix. :param rhs: Right-hand-side vector. :param dense_threshold: Dense/sparse switching threshold. :return: Linear-system solution vector. """ n_unknown: int = int(matrix.shape[0]) if n_unknown <= dense_threshold: dense_matrix: np.ndarray = matrix.toarray() try: return np.linalg.solve(dense_matrix, rhs) except np.linalg.LinAlgError: solution, _, _, _ = np.linalg.lstsq(dense_matrix, rhs, rcond=None) return solution else: try: with warnings.catch_warnings(): warnings.simplefilter("error") return spla.spsolve(matrix, rhs) except Exception: lsqr_solution = spla.lsqr(matrix, rhs) return np.asarray(lsqr_solution[0], dtype=np.float64)
[docs] def run_reduced_newton_initialization( reduced_system: ReducedInitializationSystem, problem: Any, event_x: np.ndarray, x: np.ndarray, dx: np.ndarray, constant_params: np.ndarray, ) -> bool: """ Solve the reduced initialization system with damped sparse Newton iterations. :param reduced_system: Reduced initialization system. :param problem: EMT problem being initialized. :param x: Full EMT variable vector updated in place. :param event_x: Full Rms variable vector updated in place. :param dx: Full EMT differential vector. :param runtime_params: Runtime-parameter vector. :param constant_params: Constant-parameter vector. :param options: EMT simulation options. :param report: Initialization report updated in place. :return: True if Newton converged. """ reduced_x: np.ndarray = extract_unknown_vector(problem, reduced_system.unknown_vars, x) reduced_event_x: np.ndarray = extract_unknown_events_vector(problem, reduced_system.unknown_events, event_x) max_iter: int = 1000 tol: float = 1e-8 iter_idx: int = 0 while iter_idx < max_iter: scatter_unknown_vector(problem, reduced_system.unknown_vars, x, reduced_x) scatter_unknown_event_vector(problem, reduced_system.unknown_events, x, reduced_event_x) residual: np.ndarray = evaluate_initialization_residual(reduced_system, problem, x, dx, event_x, constant_params) res_norm: float = _max_abs_value(residual) if res_norm <= tol: return True jacobian: sp.csc_matrix = evaluate_initialization_jacobian(reduced_system, problem, x, dx, event_x, constant_params) delta: np.ndarray = solve_reduced_linear_system(jacobian, -residual, 48) if not np.all(np.isfinite(delta)): return False # split delta array into vars and events alpha: float = 1.0 accepted: bool = False while alpha >= 1.0e-4: if len(reduced_x) != 0: trial_reduced_x: np.ndarray = reduced_x + alpha * delta[:len(reduced_x)] else: trial_reduced_x = reduced_x if len(reduced_event_x) != 0: trial_reduced_event_x: np.ndarray = reduced_event_x + alpha * delta[len(reduced_x):] else: trial_reduced_event_x = reduced_event_x scatter_unknown_vector(problem, reduced_system.unknown_vars, x, trial_reduced_x) scatter_unknown_event_vector(problem, reduced_system.unknown_events, event_x, trial_reduced_event_x) trial_residual: np.ndarray = evaluate_initialization_residual(reduced_system, problem, x, dx, event_x, constant_params) trial_norm: float = _max_abs_value(trial_residual) init_newton_backtracking = True if (not init_newton_backtracking) or (trial_norm < res_norm): reduced_x = trial_reduced_x reduced_event_x = trial_reduced_event_x accepted = True break else: alpha *= 0.5 if accepted: reduced_x = reduced_x reduced_event_x = reduced_event_x iter_idx += 1 else: return False return False
[docs] def run_reduced_pseudo_transient_initialization( reduced_system: ReducedInitializationSystem, problem: EmtProblemTemplate, x: np.ndarray, event_x: np.ndarray, dx: np.ndarray, constant_params: np.ndarray, ) -> bool: """ Solve the reduced initialization system with a pseudo-transient fallback. :param reduced_system: Reduced initialization system. :param problem: EMT problem being initialized. :param x: Full EMT variable vector updated in place. :param dx: Full EMT differential vector. :param event_x: Runtime-parameter vector. :param constant_params: Constant-parameter vector. :return: True if pseudo-transient converged. """ reduced_x: np.ndarray = extract_unknown_vector(problem, reduced_system.unknown_vars, x) reduced_event_x: np.ndarray = extract_unknown_events_vector(problem, reduced_system.unknown_events, event_x) dtau: float = 1.0 dtau_min: float = 1e-6 dtau_max: float = 1e1 max_iter: int = 60 tol: float = 1e-8 identity_mask: np.ndarray = np.ones(len(reduced_system.unknown_vars), dtype=np.float64) mass_matrix: sp.csc_matrix = sp.diags(identity_mask / max(dtau, dtau_min), format="csc") iter_idx: int = 0 while iter_idx < max_iter: scatter_unknown_vector(problem, reduced_system.unknown_vars, x, reduced_x) scatter_unknown_event_vector(problem, reduced_system.unknown_events, x, reduced_event_x) residual: np.ndarray = evaluate_initialization_residual(reduced_system, problem, x, dx, event_x, constant_params) res_norm: float = _max_abs_value(residual) if res_norm <= tol: return True jacobian: sp.csc_matrix = evaluate_initialization_jacobian(reduced_system, problem, x, dx, event_x, constant_params) mass_matrix = sp.diags(identity_mask / max(dtau, dtau_min), format="csc") linear_matrix: sp.csc_matrix = (jacobian + mass_matrix).tocsc() delta: np.ndarray = solve_reduced_linear_system(linear_matrix, -residual, 48) if np.all(np.isfinite(delta)): trial_reduced_x: np.ndarray = reduced_x + delta[:len(reduced_x)] trial_reduced_event_x: np.ndarray = reduced_event_x + delta[len(reduced_x):] scatter_unknown_vector(problem, reduced_system.unknown_vars, x, trial_reduced_x) scatter_unknown_event_vector(problem, reduced_system.unknown_events, event_x, trial_reduced_event_x) trial_residual: np.ndarray = evaluate_initialization_residual(reduced_system, problem, x, dx, event_x, constant_params) trial_norm: float = _max_abs_value(trial_residual) if trial_norm < res_norm: reduced_x = trial_reduced_x reduced_event_x = trial_reduced_event_x dtau = min(dtau_max, dtau * 2.0) else: dtau = max(dtau_min, dtau * 0.5) else: delta = solve_reduced_linear_system(linear_matrix, -residual, 48) if np.all(np.isfinite(delta)): reduced_x = reduced_x + delta[len(reduced_x):] reduced_event_x = reduced_event_x + delta[:len(reduced_x)] dtau = min(dtau_max, dtau * 2.0) else: dtau = max(dtau_min, dtau * 0.5) iter_idx += 1 return False
[docs] def apply_solution_to_problem(problem: Any, x: np.ndarray, event_x: np.ndarray) -> None: """ Persist the reduced-solve state vector back into the initialization guess map. :param problem: EMT problem being initialized. :param x: Full EMT variable vector. :return: None """ all_vars: List[Var] = list(problem.state_vars) + list(problem.algebraic_vars) event_params = problem.variable_parameters for var in all_vars: problem.init_guess[var.uid] = float(x[problem.uid2idx_vars[var.uid]]) for var in event_params: problem.event_params_init_dict[var.uid] = float(event_x[problem.uid2idx_event_params[var.uid]]) problem.event_parameters_eqs0[problem.uid2idx_event_params[var.uid]] = float(event_x[problem.uid2idx_event_params[var.uid]])
# def compute_missing_dx0(problem: Any, # x_full: np.ndarray | None = None, # dx_full: np.ndarray | None = None, # runtime_params: np.ndarray | None = None, # constant_params: np.ndarray | None = None) -> None: # """ # Compute missing differential initial values from the state equations. # # :param problem: EMT problem being initialized. # :param report: Initialization report updated in place. # :return: None # """ # state_eqs: List[Expr] = problem.state_eqs() # if len(state_eqs) == 0: # return # else: # state_eqs = state_eqs # # phase_t0: float = time.perf_counter() # missing_diff_vars: List[Var] # missing_state_eqs: List[Expr] # missing_diff_vars, missing_state_eqs = _collect_missing_dx_problem(problem) # # # if len(missing_diff_vars) == 0: # return # else: # pass # # phase_t0 = time.perf_counter() # if x_full is None: # x0: np.ndarray = problem.get_x0() # else: # x0 = x_full # # phase_t0 = time.perf_counter() # if dx_full is None: # dx0_seed: np.ndarray = problem.get_dx0() # else: # dx0_seed = dx_full # # # if runtime_params is None or len(runtime_params) != problem.get_variable_parameter_number(): # runtime_params = build_runtime_param_array(problem) # else: # runtime_params = runtime_params # # if constant_params is None or len(constant_params) != len(problem.get_parameters_values()): # constant_params = build_constant_param_array(problem) # else: # constant_params = constant_params # cache_key: str = _build_state_rhs_cache_key(problem, missing_state_eqs) # state_rhs_fn = _load_or_build_state_rhs_vector(problem, missing_state_eqs, cache_key, report) # # phase_t0 = time.perf_counter() # computed_dx: np.ndarray = state_rhs_fn(x0, dx0_seed, runtime_params, constant_params).copy() # # # idx: int = 0 # phase_t0 = time.perf_counter() # while idx < len(missing_diff_vars): # diff_var: Var = missing_diff_vars[idx] # problem.diff_init_guess[diff_var.uid] = float(computed_dx[idx]) # # idx += 1 # # def _build_initialization_system_if_needed( problem: EmtProblemTemplate, options: EmtOptions, report: EmtInitializationReport, ) -> ReducedInitializationSystem | None: """ Build the reduced initialization system only when unresolved work remains. :param problem: EMT problem being initialized. :param options: EMT simulation options. :param report: Initialization report updated in place. :return: Reduced initialization system or None. """ reduced_system: ReducedInitializationSystem | None = build_reduced_initialization_system( problem=problem, allow_state_equilibrium=bool(options.init_allow_state_equilibrium), ) if reduced_system is None: report.unknown_var_count = 0 report.unresolved_state_count = 0 else: report.unknown_var_count = len(reduced_system.unknown_vars) report.unresolved_state_count = int(np.count_nonzero(reduced_system.state_unknown_mask)) return reduced_system
[docs] def run_rms_native_initialization(problem: Any, options: RmsOptions): """ Run the EMT-native initialization stages after PF projection and explicit init. The workflow keeps the current PF/explicit seeds intact, then solves only the unresolved subset with a reduced consistent Newton method. When Newton fails, the workflow may fall back to a pseudo-transient continuation. Missing differential initial values are computed automatically from the state equations. :param problem: EMT problem being initialized. :param options: EMT simulation options. :return: Initialization report. """ x: np.ndarray = problem.get_x0() event_x: np.ndarray = problem.get_eventparams0() x_seed: np.ndarray = x.copy() dx: np.ndarray = problem.get_dx0() runtime_params: np.ndarray = np.zeros(0, dtype=np.float64) reduced_system: ReducedInitializationSystem | None runtime_params: np.ndarray constant_params: np.ndarray reduced_problem_payload: Tuple[List[Var], List[Var], List[Var], List[Expr], np.ndarray] | None newton_init = True reduced_problem_payload = collect_reduced_initialization_problem( problem=problem, # Todo: maby allow_state_equilibrium = False doesn't work allow_state_equilibrium= False, ) if reduced_problem_payload is not None: if len(reduced_problem_payload[0]) != 0: constant_params = build_constant_param_array(problem) reduced_system = ReducedInitializationSystem( problem=problem, unknowns=reduced_problem_payload[0], unknown_vars=reduced_problem_payload[1], unknown_events=reduced_problem_payload[2], residual_eqs=reduced_problem_payload[3], state_unknown_mask=reduced_problem_payload[4] ) if reduced_system is not None: if newton_init: converged = run_reduced_newton_initialization( reduced_system=reduced_system, problem=problem, x=x, event_x=event_x, dx=dx, constant_params=constant_params, ) else: converged = run_reduced_pseudo_transient_initialization( reduced_system=reduced_system, problem=problem, x=x, event_x=event_x, dx=dx, constant_params=constant_params, ) if converged: apply_solution_to_problem(problem, x, event_x)