# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from typing import Callable, Dict, List, Tuple, Union, Optional
import numpy as np
import numba as nb
import math
import time
import hashlib
import os
import pickle
from pathlib import Path
import warnings
import scipy.sparse as sp
import scipy.sparse.linalg as spla
from collections import defaultdict, deque
from VeraGridEngine.enumerations import EmtInitializationMethod, EmtInitializationStatus
from VeraGridEngine.Simulations.EMT.emt_options import EmtOptions
from VeraGridEngine.Simulations.EMT.problems.emt_problem_template import EmtProblemTemplate
from VeraGridEngine.Simulations.Rms.numerical.pseudo_transient import PseudoTransient
from VeraGridEngine.Simulations.Rms.problems.rms_problem_template import RmsProblemTemplate
from VeraGridEngine.Utils.Symbolic.compiled_functions import SymbolicJacobian, SymbolicVector
from VeraGridEngine.Utils.Symbolic.jit_compiler import RMSCompiler
from VeraGridEngine.Utils.Symbolic.symbolic import expression2numba, get_expression_vars, heaviside_num
from VeraGridEngine.Utils.Symbolic.block import Block
from VeraGridEngine.Utils.Symbolic.symbolic import Var, Const, Expr, find_vars_order
InitializationStatus = EmtInitializationStatus
[docs]
class EmtInitializationReport:
"""
Stores the outcome of the EMT-native initialization workflow.
"""
__slots__ = [
"status",
"method_requested",
"method_used",
"initial_residual_inf",
"final_residual_inf",
"newton_iterations",
"pseudo_transient_steps",
"unknown_var_count",
"unresolved_state_count",
"automatic_dx0_count",
"reduced_system_build_s",
"runtime_param_build_s",
"constant_param_build_s",
"initial_residual_eval_s",
"newton_elapsed_s",
"pseudo_transient_elapsed_s",
"dx0_completion_s",
"x0_rebuild_s",
"dx0_seed_build_s",
"state_rhs_vector_build_s",
"state_rhs_vector_cache_hit",
"solution_apply_s",
"missing_dx_problem_collect_s",
"state_rhs_eval_s",
"missing_dx_scatter_s",
"persistent_cache_hit",
"persistent_cache_load_s",
"persistent_cache_store_s",
"elapsed_s",
"message",
]
def __init__(self, method_requested: EmtInitializationMethod) -> None:
"""
Build an empty EMT initialization report.
:param method_requested: Requested initialization workflow.
:return: None
"""
self.status: InitializationStatus = InitializationStatus.PENDING
self.method_requested: EmtInitializationMethod = method_requested
self.method_used: EmtInitializationMethod = EmtInitializationMethod.Explicit
self.initial_residual_inf: float = 0.0
self.final_residual_inf: float = 0.0
self.newton_iterations: int = 0
self.pseudo_transient_steps: int = 0
self.unknown_var_count: int = 0
self.unresolved_state_count: int = 0
self.automatic_dx0_count: int = 0
self.reduced_system_build_s: float = 0.0
self.runtime_param_build_s: float = 0.0
self.constant_param_build_s: float = 0.0
self.initial_residual_eval_s: float = 0.0
self.newton_elapsed_s: float = 0.0
self.pseudo_transient_elapsed_s: float = 0.0
self.dx0_completion_s: float = 0.0
self.x0_rebuild_s: float = 0.0
self.dx0_seed_build_s: float = 0.0
self.state_rhs_vector_build_s: float = 0.0
self.state_rhs_vector_cache_hit: bool = False
self.solution_apply_s: float = 0.0
self.missing_dx_problem_collect_s: float = 0.0
self.state_rhs_eval_s: float = 0.0
self.missing_dx_scatter_s: float = 0.0
self.persistent_cache_hit: bool = False
self.persistent_cache_load_s: float = 0.0
self.persistent_cache_store_s: float = 0.0
self.elapsed_s: float = 0.0
self.message: str = ""
[docs]
class ReducedInitializationSystemCacheEntry:
"""
In-process cache entry for reduced EMT initialization systems.
"""
__slots__ = ["_residual_fn", "_jacobian_fn"]
def __init__(self, residual_fn: SymbolicVector, jacobian_fn: SymbolicJacobian) -> None:
"""
Build one reduced-system cache entry.
:param residual_fn: Reduced residual evaluator.
:type residual_fn: SymbolicVector
:param jacobian_fn: Reduced Jacobian evaluator.
:type jacobian_fn: SymbolicJacobian
:return: None.
:rtype: None
"""
self._residual_fn = residual_fn
self._jacobian_fn = jacobian_fn
[docs]
def get_residual_fn(self) -> SymbolicVector:
"""
Return the cached reduced residual evaluator.
:return: Reduced residual evaluator.
:rtype: SymbolicVector
"""
return self._residual_fn
[docs]
def get_jacobian_fn(self) -> SymbolicJacobian:
"""
Return the cached reduced Jacobian evaluator.
:return: Reduced Jacobian evaluator.
:rtype: SymbolicJacobian
"""
return self._jacobian_fn
[docs]
class ReducedInitializationSystemCache:
"""
In-process cache for reduced EMT initialization systems.
"""
__slots__ = ["_cache"]
def __init__(self) -> None:
"""
Build the reduced-system cache.
:return: None.
:rtype: None
"""
self._cache: Dict[str, ReducedInitializationSystemCacheEntry] = dict()
[docs]
def get_entry(self, cache_key: str) -> ReducedInitializationSystemCacheEntry | None:
"""
Return one cached reduced-system entry.
:param cache_key: Deterministic cache key.
:type cache_key: str
:return: Cached entry or ``None``.
:rtype: ReducedInitializationSystemCacheEntry | None
"""
return self._cache.get(cache_key, None)
[docs]
def set_entry(self, cache_key: str, entry: ReducedInitializationSystemCacheEntry) -> None:
"""
Store one cached reduced-system entry.
:param cache_key: Deterministic cache key.
:type cache_key: str
:param entry: Cache entry.
:type entry: ReducedInitializationSystemCacheEntry
:return: None.
:rtype: None
"""
self._cache[cache_key] = entry
REDUCED_INITIALIZATION_SYSTEM_CACHE = ReducedInitializationSystemCache()
[docs]
class InitializationVectorCacheEntry:
"""
In-process cache entry for one initialization-stage symbolic vector.
"""
__slots__ = ["_vector"]
def __init__(self, vector: SymbolicVector) -> None:
"""
Build one initialization-vector cache entry.
:param vector: Cached symbolic vector evaluator.
:type vector: SymbolicVector
:return: None.
:rtype: None
"""
self._vector = vector
[docs]
def get_vector(self) -> SymbolicVector:
"""
Return the cached symbolic vector evaluator.
:return: Cached symbolic vector evaluator.
:rtype: SymbolicVector
"""
return self._vector
[docs]
class InitializationVectorCache:
"""
In-process cache for symbolic vectors used during EMT initialization.
"""
__slots__ = ["_cache"]
def __init__(self) -> None:
"""
Build the initialization-vector cache.
:return: None.
:rtype: None
"""
self._cache: Dict[str, InitializationVectorCacheEntry] = dict()
[docs]
def get_entry(self, cache_key: str) -> InitializationVectorCacheEntry | None:
"""
Return one cached initialization-vector entry.
:param cache_key: Deterministic cache key.
:type cache_key: str
:return: Cached entry or ``None``.
:rtype: InitializationVectorCacheEntry | None
"""
return self._cache.get(cache_key, None)
[docs]
def set_entry(self, cache_key: str, entry: InitializationVectorCacheEntry) -> None:
"""
Store one cached initialization-vector entry.
:param cache_key: Deterministic cache key.
:type cache_key: str
:param entry: Cache entry.
:type entry: InitializationVectorCacheEntry
:return: None.
:rtype: None
"""
self._cache[cache_key] = entry
INITIALIZATION_VECTOR_CACHE = InitializationVectorCache()
[docs]
class InitializationStateRhsVector:
"""
Lightweight state-RHS evaluator used by ``dx0`` completion.
"""
__slots__ = ["_func", "_data"]
def __init__(self, source_code: str, use_jit: bool, output_size: int) -> None:
"""
Build the state-RHS evaluator from source code.
:param source_code: Generated Python source code.
:type source_code: str
:param use_jit: Whether the evaluator should be JIT-compiled.
:type use_jit: bool
:param output_size: Output vector length.
:type output_size: int
:return: None.
:rtype: None
"""
namespace: Dict[str, object] = dict(math=__import__("math"), np=np, _heaviside=heaviside_num)
exec(source_code, namespace)
if use_jit:
self._func = nb.njit(
nb.void(nb.float64[:], nb.float64[:], nb.float64[:], nb.float64[:], nb.float64[:]),
fastmath=True,
cache=False,
)(namespace["func"])
else:
self._func = namespace["func"]
self._data = np.zeros(output_size, dtype=np.float64)
def __call__(self, values: np.ndarray, diff_values: np.ndarray, event_params: np.ndarray, params: np.ndarray) -> np.ndarray:
"""
Evaluate the state-RHS vector.
:param values: State/algebraic vector.
:type values: np.ndarray
:param diff_values: Differential vector.
:type diff_values: np.ndarray
:param event_params: Runtime parameter vector.
:type event_params: np.ndarray
:param params: Constant parameter vector.
:type params: np.ndarray
:return: Evaluated state-RHS vector.
:rtype: np.ndarray
"""
self._func(values, diff_values, event_params, params, self._data)
return self._data
def _get_state_rhs_persistent_cache_directory() -> Path:
"""
Return the persistent cache directory used by ``dx0`` state-RHS evaluators.
:return: Persistent cache directory.
:rtype: Path
"""
cache_directory: Path = _get_reduced_initialization_persistent_cache_directory() / "state_rhs_vectors"
cache_directory.mkdir(parents=True, exist_ok=True)
return cache_directory
def _build_state_rhs_source(
missing_state_eqs: List[Expr],
compiler_names_dict: Dict[int, str],
alias_names_dict: Dict[int, str],
vars_name: str,
diff_name: str,
event_params_name: str,
params_name: str,
) -> str:
"""
Build the Python source code of the ``dx0`` state-RHS evaluator.
:param missing_state_eqs: State equations whose differential values are missing.
:type missing_state_eqs: List[Expr]
:param compiler_names_dict: Compiler variable-name map.
:type compiler_names_dict: Dict[int, str]
:param alias_names_dict: Alias variable-name map.
:type alias_names_dict: Dict[int, str]
:param vars_name: Full-variable argument name.
:type vars_name: str
:param diff_name: Differential-vector argument name.
:type diff_name: str
:param event_params_name: Runtime-parameter argument name.
:type event_params_name: str
:param params_name: Constant-parameter argument name.
:type params_name: str
:return: Generated Python source code.
:rtype: str
"""
used_vars_count: Dict[int, int] = dict()
final_names_dict: Dict[int, str] = compiler_names_dict.copy()
lines: List[str] = list()
for eq in missing_state_eqs:
for var in get_expression_vars(eq):
if var.uid in used_vars_count:
used_vars_count[var.uid] += 1
else:
used_vars_count[var.uid] = 1
lines.append(f"def func({vars_name}, {diff_name}, {event_params_name}, {params_name}, out):")
for uid, count in used_vars_count.items():
if count > 1:
final_names_dict[uid] = alias_names_dict[uid]
lines.append(f" {alias_names_dict[uid]} = {compiler_names_dict[uid]}")
else:
pass
for eq_index, equation in enumerate(missing_state_eqs):
lines.append(f" out[{eq_index}] = {expression2numba(equation.simplify(), final_names_dict)}")
return "\n".join(lines) + "\n"
def _load_or_build_state_rhs_vector(
problem: EmtProblemTemplate,
missing_state_eqs: List[Expr],
cache_key: str,
report: EmtInitializationReport,
) -> InitializationStateRhsVector | SymbolicVector:
"""
Load or build the ``dx0`` state-RHS evaluator.
:param problem: EMT problem being initialized.
:type problem: EmtProblemTemplate
:param missing_state_eqs: State equations whose differential values are missing.
:type missing_state_eqs: List[Expr]
:param cache_key: Deterministic cache key.
:type cache_key: str
:param report: Initialization report updated in place.
:type report: EmtInitializationReport
:return: State-RHS evaluator.
:rtype: InitializationStateRhsVector | SymbolicVector
"""
cache_entry: InitializationVectorCacheEntry | None = INITIALIZATION_VECTOR_CACHE.get_entry(cache_key)
cache_path: Path = _get_state_rhs_persistent_cache_directory() / f"{cache_key}.pkl"
if cache_entry is None:
pass
else:
report.state_rhs_vector_build_s = 0.0
report.state_rhs_vector_cache_hit = True
return cache_entry.get_vector()
phase_t0: float = time.perf_counter()
if cache_path.exists():
with open(cache_path, "rb") as cache_file:
payload: Dict[str, object] = dict(pickle.load(cache_file))
state_rhs_fn = InitializationStateRhsVector(
source_code=str(payload["source_code"]),
use_jit=bool(payload["use_jit"]),
output_size=int(payload["output_size"]),
)
INITIALIZATION_VECTOR_CACHE.set_entry(cache_key, InitializationVectorCacheEntry(state_rhs_fn))
report.state_rhs_vector_build_s = float(time.perf_counter() - phase_t0)
report.state_rhs_vector_cache_hit = True
return state_rhs_fn
else:
pass
if len(missing_state_eqs) <= 16:
use_jit_for_state_rhs: bool = False
else:
use_jit_for_state_rhs = True
source_code: str = _build_state_rhs_source(
missing_state_eqs,
problem.get_compiler_names_dict(),
problem.get_alias_names_dict(),
problem.VARS_NAME,
problem.DIFF_NAME,
problem.VARIABLE_PARAMS_NAME,
problem.CONSTANT_PARAMS_NAME,
)
state_rhs_fn = InitializationStateRhsVector(
source_code=source_code,
use_jit=use_jit_for_state_rhs,
output_size=len(missing_state_eqs),
)
INITIALIZATION_VECTOR_CACHE.set_entry(cache_key, InitializationVectorCacheEntry(state_rhs_fn))
with open(cache_path, "wb") as cache_file:
pickle.dump(
dict(
source_code=source_code,
use_jit=use_jit_for_state_rhs,
output_size=len(missing_state_eqs),
),
cache_file,
protocol=pickle.HIGHEST_PROTOCOL,
)
report.state_rhs_vector_build_s = float(time.perf_counter() - phase_t0)
report.state_rhs_vector_cache_hit = False
return state_rhs_fn
def _get_reduced_initialization_persistent_cache_directory() -> Path:
"""
Return the persistent cache directory used by reduced EMT initialization.
:return: Persistent cache directory.
:rtype: Path
"""
override_path: str = os.environ.get("VERAGRID_EMT_INIT_CACHE_DIR", "")
if len(override_path) > 0:
cache_directory: Path = Path(override_path)
else:
cache_directory = Path.home() / ".VeraGrid" / "cache" / "emt_reduced_initialization"
cache_directory.mkdir(parents=True, exist_ok=True)
return cache_directory
def _serialize_initialization_report(report: EmtInitializationReport) -> Dict[str, object]:
"""
Serialize an EMT initialization report into a cache-safe dictionary.
:param report: Initialization report.
:type report: EmtInitializationReport
:return: Serialized report.
:rtype: Dict[str, object]
"""
return dict(
status=report.status.name,
method_requested=report.method_requested.name,
method_used=report.method_used.name,
initial_residual_inf=float(report.initial_residual_inf),
final_residual_inf=float(report.final_residual_inf),
newton_iterations=int(report.newton_iterations),
pseudo_transient_steps=int(report.pseudo_transient_steps),
unknown_var_count=int(report.unknown_var_count),
unresolved_state_count=int(report.unresolved_state_count),
automatic_dx0_count=int(report.automatic_dx0_count),
reduced_system_build_s=float(report.reduced_system_build_s),
runtime_param_build_s=float(report.runtime_param_build_s),
constant_param_build_s=float(report.constant_param_build_s),
initial_residual_eval_s=float(report.initial_residual_eval_s),
newton_elapsed_s=float(report.newton_elapsed_s),
pseudo_transient_elapsed_s=float(report.pseudo_transient_elapsed_s),
dx0_completion_s=float(report.dx0_completion_s),
x0_rebuild_s=float(report.x0_rebuild_s),
dx0_seed_build_s=float(report.dx0_seed_build_s),
state_rhs_vector_build_s=float(report.state_rhs_vector_build_s),
state_rhs_vector_cache_hit=bool(report.state_rhs_vector_cache_hit),
solution_apply_s=float(report.solution_apply_s),
missing_dx_problem_collect_s=float(report.missing_dx_problem_collect_s),
state_rhs_eval_s=float(report.state_rhs_eval_s),
missing_dx_scatter_s=float(report.missing_dx_scatter_s),
persistent_cache_hit=bool(report.persistent_cache_hit),
persistent_cache_load_s=float(report.persistent_cache_load_s),
persistent_cache_store_s=float(report.persistent_cache_store_s),
elapsed_s=float(report.elapsed_s),
message=str(report.message),
)
def _serialize_problem_guess_map(problem: EmtProblemTemplate, guess_map: Dict[int, float], diff_guess: bool) -> Dict[str, float]:
"""
Serialize an initialization guess map using stable variable names.
:param problem: EMT problem instance.
:type problem: EmtProblemTemplate
:param guess_map: UID-keyed initialization guess map.
:type guess_map: Dict[int, float]
:param diff_guess: Whether the map contains differential-variable guesses.
:type diff_guess: bool
:return: Name-keyed initialization guess map.
:rtype: Dict[str, float]
"""
uid_to_name: Dict[int, str] = dict()
serialized: Dict[str, float] = dict()
alias_name_map: Dict[int, str] = problem.get_alias_names_dict()
if diff_guess:
for diff_var in problem.get_diff_vars():
uid_to_name[diff_var.uid] = alias_name_map[diff_var.uid]
else:
for var in problem.get_state_vars():
uid_to_name[var.uid] = alias_name_map[var.uid]
for var in problem.get_algebraic_vars():
uid_to_name[var.uid] = alias_name_map[var.uid]
for uid, value in guess_map.items():
if uid in uid_to_name:
serialized[uid_to_name[uid]] = float(value)
else:
pass
return serialized
def _restore_problem_guess_map(problem: EmtProblemTemplate, name_map: Dict[str, float], diff_guess: bool) -> Dict[int, float]:
"""
Restore an initialization guess map from stable variable names.
:param problem: EMT problem instance.
:type problem: EmtProblemTemplate
:param name_map: Name-keyed initialization guess map.
:type name_map: Dict[str, float]
:param diff_guess: Whether the map contains differential-variable guesses.
:type diff_guess: bool
:return: UID-keyed initialization guess map.
:rtype: Dict[int, float]
"""
name_to_uid: Dict[str, int] = dict()
restored: Dict[int, float] = dict()
alias_name_map: Dict[int, str] = problem.get_alias_names_dict()
if diff_guess:
for diff_var in problem.get_diff_vars():
name_to_uid[alias_name_map[diff_var.uid]] = diff_var.uid
else:
for var in problem.get_state_vars():
name_to_uid[alias_name_map[var.uid]] = var.uid
for var in problem.get_algebraic_vars():
name_to_uid[alias_name_map[var.uid]] = var.uid
for name, value in name_map.items():
if name in name_to_uid:
restored[name_to_uid[name]] = float(value)
else:
pass
return restored
def _restore_initialization_report_from_payload(
report: EmtInitializationReport,
payload: Dict[str, object],
) -> None:
"""
Restore an EMT initialization report from a serialized payload.
:param report: Initialization report updated in place.
:type report: EmtInitializationReport
:param payload: Serialized report payload.
:type payload: Dict[str, object]
:return: None.
:rtype: None
"""
report.status = InitializationStatus[str(payload["status"])]
report.method_requested = EmtInitializationMethod[str(payload["method_requested"])]
report.method_used = EmtInitializationMethod[str(payload["method_used"])]
report.initial_residual_inf = float(payload["initial_residual_inf"])
report.final_residual_inf = float(payload["final_residual_inf"])
report.newton_iterations = int(payload["newton_iterations"])
report.pseudo_transient_steps = int(payload["pseudo_transient_steps"])
report.unknown_var_count = int(payload["unknown_var_count"])
report.unresolved_state_count = int(payload["unresolved_state_count"])
report.automatic_dx0_count = int(payload["automatic_dx0_count"])
report.reduced_system_build_s = float(payload["reduced_system_build_s"])
report.runtime_param_build_s = float(payload["runtime_param_build_s"])
report.constant_param_build_s = float(payload["constant_param_build_s"])
report.initial_residual_eval_s = float(payload["initial_residual_eval_s"])
report.newton_elapsed_s = float(payload["newton_elapsed_s"])
report.pseudo_transient_elapsed_s = float(payload["pseudo_transient_elapsed_s"])
report.dx0_completion_s = float(payload["dx0_completion_s"])
report.x0_rebuild_s = float(payload.get("x0_rebuild_s", 0.0))
report.dx0_seed_build_s = float(payload.get("dx0_seed_build_s", 0.0))
report.state_rhs_vector_build_s = float(payload.get("state_rhs_vector_build_s", 0.0))
report.state_rhs_vector_cache_hit = bool(payload.get("state_rhs_vector_cache_hit", False))
report.solution_apply_s = float(payload.get("solution_apply_s", 0.0))
report.missing_dx_problem_collect_s = float(payload.get("missing_dx_problem_collect_s", 0.0))
report.state_rhs_eval_s = float(payload.get("state_rhs_eval_s", 0.0))
report.missing_dx_scatter_s = float(payload.get("missing_dx_scatter_s", 0.0))
report.persistent_cache_hit = bool(payload.get("persistent_cache_hit", False))
report.persistent_cache_load_s = float(payload.get("persistent_cache_load_s", 0.0))
report.persistent_cache_store_s = float(payload.get("persistent_cache_store_s", 0.0))
report.elapsed_s = float(payload["elapsed_s"])
report.message = str(payload["message"])
def _build_persistent_initialization_cache_key(
problem: EmtProblemTemplate,
unknown_vars: List[Var],
residual_eqs: List[Expr],
state_unknown_mask: np.ndarray,
options: EmtOptions,
) -> str:
"""
Return the deterministic cache key of one persistent reduced initialization result.
:param problem: EMT problem being initialized.
:type problem: EmtProblemTemplate
:param unknown_vars: Unknown variables in the reduced system.
:type unknown_vars: List[Var]
:param residual_eqs: Residual equations in the reduced system.
:type residual_eqs: List[Expr]
:param state_unknown_mask: Reduced state mask.
:type state_unknown_mask: np.ndarray
:param options: EMT initialization options.
:type options: EmtOptions
:return: Deterministic cache key.
:rtype: str
"""
reduced_key: str = _build_reduced_initialization_cache_key(problem, unknown_vars, residual_eqs, state_unknown_mask)
payload: str = "|".join([
"persistent-reduced-emt-init-v5",
reduced_key,
options.initialization_method.name,
f"{float(options.init_newton_tol):.12g}",
str(int(options.init_newton_max_iter)),
str(int(options.init_dense_threshold)),
str(bool(options.init_newton_backtracking)),
str(bool(options.init_allow_state_equilibrium)),
f"{float(options.init_ptc_dtau0):.12g}",
f"{float(options.init_ptc_dtau_min):.12g}",
f"{float(options.init_ptc_dtau_max):.12g}",
str(int(options.init_ptc_max_iter)),
])
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _persistent_initialization_params_match(
payload: Dict[str, object],
runtime_params: np.ndarray,
constant_params: np.ndarray,
) -> bool:
"""
Return whether one persistent initialization payload matches the current parameter snapshots.
:param payload: Cached payload.
:type payload: Dict[str, object]
:param runtime_params: Runtime parameter vector.
:type runtime_params: np.ndarray
:param constant_params: Constant parameter vector.
:type constant_params: np.ndarray
:return: ``True`` when the parameter snapshots match.
:rtype: bool
"""
runtime_snapshot_any: object = payload.get("runtime_params_snapshot", list())
constant_snapshot_any: object = payload.get("constant_params_snapshot", list())
if isinstance(runtime_snapshot_any, list):
runtime_snapshot = np.asarray(runtime_snapshot_any, dtype=np.float64)
else:
return False
if isinstance(constant_snapshot_any, list):
constant_snapshot = np.asarray(constant_snapshot_any, dtype=np.float64)
else:
return False
if runtime_snapshot.shape != runtime_params.shape:
return False
else:
pass
if constant_snapshot.shape != constant_params.shape:
return False
else:
pass
if runtime_snapshot.size > 0:
if float(np.max(np.abs(runtime_snapshot - runtime_params))) > 1.0e-12:
return False
else:
pass
else:
pass
if constant_snapshot.size > 0:
if float(np.max(np.abs(constant_snapshot - constant_params))) > 1.0e-12:
return False
else:
pass
else:
pass
return True
def _load_persistent_initialization_solution(cache_key: str) -> Dict[str, object] | None:
"""
Load one persistent reduced initialization result.
:param cache_key: Deterministic cache key.
:type cache_key: str
:return: Cached payload or ``None``.
:rtype: Dict[str, object] | None
"""
cache_path: Path = _get_reduced_initialization_persistent_cache_directory() / f"{cache_key}.pkl"
if cache_path.exists():
with open(cache_path, "rb") as cache_file:
return dict(pickle.load(cache_file))
else:
return None
def _store_persistent_initialization_solution(cache_key: str, payload: Dict[str, object]) -> None:
"""
Store one persistent reduced initialization result.
:param cache_key: Deterministic cache key.
:type cache_key: str
:param payload: Cache payload.
:type payload: Dict[str, object]
:return: None.
:rtype: None
"""
cache_path: Path = _get_reduced_initialization_persistent_cache_directory() / f"{cache_key}.pkl"
with open(cache_path, "wb") as cache_file:
pickle.dump(payload, cache_file, protocol=pickle.HIGHEST_PROTOCOL)
def _build_reduced_initialization_cache_key(
problem: EmtProblemTemplate,
unknown_vars: List[Var],
residual_eqs: List[Expr],
state_unknown_mask: np.ndarray,
) -> str:
"""
Return a deterministic cache key for one reduced EMT initialization system.
:param problem: EMT problem being initialized.
:type problem: EmtProblemTemplate
:param unknown_vars: Unknown variables solved by the reduced system.
:type unknown_vars: List[Var]
:param residual_eqs: Residual equations enforcing consistency.
:type residual_eqs: List[Expr]
:param state_unknown_mask: State-mask vector.
:type state_unknown_mask: np.ndarray
:return: Deterministic cache key.
:rtype: str
"""
unknown_var_tokens: List[str] = list()
residual_tokens: List[str] = list()
mask_tokens: List[str] = list()
index: int = 0
while index < len(unknown_vars):
unknown_var_tokens.append(unknown_vars[index].name)
index += 1
index = 0
while index < len(residual_eqs):
residual_tokens.append(expression2numba(residual_eqs[index].simplify(), problem.get_compiler_names_dict()))
index += 1
index = 0
while index < state_unknown_mask.shape[0]:
mask_tokens.append(str(float(state_unknown_mask[index])))
index += 1
payload: str = "|".join([
"reduced-emt-init",
problem.VARS_NAME,
problem.DIFF_NAME,
problem.VARIABLE_PARAMS_NAME,
problem.CONSTANT_PARAMS_NAME,
str(problem.get_states_number()),
str(problem.get_algebraic_var_number()),
str(len(unknown_vars)),
str(len(residual_eqs)),
"::".join(unknown_var_tokens),
"::".join(mask_tokens),
"::".join(residual_tokens),
])
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _build_state_rhs_cache_key(problem: EmtProblemTemplate, state_eqs: List[Expr]) -> str:
"""
Return the cache key for the symbolic vector used to compute missing ``dx0``.
:param problem: EMT problem being initialized.
:type problem: EmtProblemTemplate
:param state_eqs: State equations.
:type state_eqs: List[Expr]
:return: Deterministic cache key.
:rtype: str
"""
expr_tokens: List[str] = list()
index: int = 0
while index < len(state_eqs):
expr_tokens.append(expression2numba(state_eqs[index].simplify(), problem.get_compiler_names_dict()))
index += 1
payload: str = "|".join([
"state-rhs-vector",
problem.VARS_NAME,
problem.DIFF_NAME,
problem.VARIABLE_PARAMS_NAME,
problem.CONSTANT_PARAMS_NAME,
str(problem.get_states_number()),
"::".join(expr_tokens),
])
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _collect_missing_dx_problem(problem: EmtProblemTemplate,
include_existing: bool = False) -> Tuple[List[Var], List[Expr]]:
"""
Return the differential variables that still need ``dx0`` and their state equations.
:param problem: EMT problem being initialized.
:type problem: EmtProblemTemplate
:param include_existing: When ``True``, collect all differential variables.
:return: Pair ``(missing_diff_vars, missing_state_eqs)``.
:rtype: Tuple[List[Var], List[Expr]]
"""
diff_vars: List[Var] = problem.get_diff_vars()
state_eqs: List[Expr] = problem.get_state_eqs()
missing_diff_vars: List[Var] = list()
missing_state_eqs: List[Expr] = list()
idx: int = 0
while idx < len(diff_vars):
diff_var: Var = diff_vars[idx]
if include_existing:
missing_diff_vars.append(diff_var)
missing_state_eqs.append(state_eqs[idx])
else:
if diff_var.uid in problem.diff_init_guess:
diff_var = diff_var
else:
missing_diff_vars.append(diff_var)
missing_state_eqs.append(state_eqs[idx])
idx += 1
return missing_diff_vars, missing_state_eqs
[docs]
def build_uid_bindings(
eq: Union[Expr, Const],
event_params_array: np.ndarray,
x: np.ndarray,
params_array: np.ndarray,
dx: np.ndarray,
uid2idx_event_params: Dict[int, int],
uid2idx_vars: Dict[int, int],
uid2idx_params: Dict[int, int],
uid2idx_diff: Dict[int, int]
) -> Dict[int, float]:
"""
Builds a mapping of Unique Identifiers (UIDs) to their current numeric values.
:param eq: The symbolic expression to analyze.
:param event_params_array: Array containing resolved event parameters.
:param x: Array containing state and algebraic variables.
:param params_array: Array containing constant model parameters.
:param dx: Array containing derivative values.
:param uid2idx_event_params: Mapping of UID to index for event parameters.
:param uid2idx_vars: Mapping of UID to index for variables.
:param uid2idx_params: Mapping of UID to index for constants.
:param uid2idx_diff: Mapping of UID to index for derivatives.
:return: A dictionary mapping UIDs to their float values.
"""
uid_bindings: Dict[int, float] = dict()
vars_list: List[Var] = find_vars_order(eq)
for vr in vars_list:
resolved_value: Optional[float] = None
event_idx: Optional[int] = uid2idx_event_params.get(vr.uid, None)
state_idx: Optional[int] = uid2idx_vars.get(vr.uid, None)
param_idx: Optional[int] = uid2idx_params.get(vr.uid, None)
diff_idx: Optional[int] = uid2idx_diff.get(vr.uid, None)
if event_idx is None:
if state_idx is None:
if param_idx is None:
if diff_idx is None:
resolved_value = None
else:
resolved_value = float(dx[diff_idx])
else:
resolved_value = float(params_array[param_idx])
else:
resolved_value = float(x[state_idx])
else:
resolved_value = float(event_params_array[event_idx])
if resolved_value is None:
uid_bindings = uid_bindings
else:
uid_bindings[vr.uid] = resolved_value
return uid_bindings
[docs]
def event_param_is_resolved(v: Var, mdl: Block) -> bool:
"""
Checks if an event parameter has been assigned a concrete value.
:param v: The variable representing the event parameter.
:param mdl: The symbolic model block.
:return: True if resolved, False otherwise.
"""
if v in mdl.event_dict:
eqv: Union[Expr, Const] = mdl.event_dict[v]
if isinstance(eqv, Const):
if eqv.value is None:
return False
else:
return True
else:
return True
else:
return True
[docs]
def can_compute_init(
var: Var,
eq: Union[Expr, Const],
mdl: Block,
init_guess: Dict[int, float],
diff_init_guess: Dict[int, float],
uid2idx_diff: Dict[int, int],
uid2idx_vars: Dict[int, int]
) -> bool:
"""
Determines if an initialization equation is computable based on current knowns.
:param var: Target variable.
:param eq: The expression to evaluate.
:param mdl: The model block.
:param init_guess: Current variable guesses.
:param diff_init_guess: Current derivative guesses.
:param uid2idx_diff: UID map for derivatives.
:param uid2idx_vars: UID map for variables.
:return: Boolean indicating if all dependencies are met.
"""
vars_needed: List[Var] = get_expression_vars(eq)
for v_dep in vars_needed:
if v_dep.uid == var.uid:
dependency_ready: bool = True
elif v_dep in mdl.event_dict:
dependency_ready = event_param_is_resolved(v_dep, mdl)
elif v_dep.uid in uid2idx_diff:
dependency_ready = v_dep.uid in diff_init_guess
elif v_dep.uid in uid2idx_vars:
dependency_ready = v_dep.uid in init_guess
else:
dependency_ready = True
if dependency_ready:
dependency_ready = dependency_ready
else:
return False
return True
[docs]
def init_explicit_emt(
mdl: Block,
sys_vars: Dict[int, Var],
sys_diff_vars: Dict[int, Var],
variable_parameters: List[Var],
event_parameters_eqs: List[Union[Expr, Const]],
constant_parameters: List[Var],
constant_parameter_values: List[Const],
init_guess: Dict[int, float],
diff_init_guess: Dict[int, float],
uid2idx_vars: Dict[int, int],
uid2idx_diff: Dict[int, int],
uid2idx_params: Dict[int, int],
uid2idx_event_params: Dict[int, int],
compiler_names_dict: Dict[int, str],
alias_names_dict: Dict[int, str],
VARIABLE_PARAMS_NAME: str,
VARS_NAME: str,
DIFF_NAME: str,
CONSTANT_PARAMS_NAME: str,
verbose: bool = False) -> Optional[Tuple[Dict[int, float], Dict[int, float]]]:
"""
Initializes the EMT model by solving initialization and differential equations.
This function performs a topological-like resolution of model variables.
It uses fixed-point iteration for self-referencing equations and handles
dependencies between algebraic and differential states.
:param mdl: Symbolic model block.
:param sys_vars: Registry of system variables.
:param sys_diff_vars: Registry of system differential variables.
:param variable_parameters: List of parameters that change on events.
:param event_parameters_eqs: Initial equations for event parameters.
:param constant_parameters: List of constant parameters.
:param constant_parameter_values: Values associated with ``constant_parameters``.
:param init_guess: Initial guesses for variables.
:param diff_init_guess: Initial guesses for derivatives.
:param uid2idx_vars: Index map for variables.
:param uid2idx_diff: Index map for derivatives.
:param uid2idx_params: Index map for parameters.
:param uid2idx_event_params: Index map for event parameters.
:param compiler_names_dict: Map for variable naming in compilation.
:param alias_names_dict: Map for variable aliases.
:param VARIABLE_PARAMS_NAME: Name for variable parameter array.
:param VARS_NAME: Name for state variable array.
:param DIFF_NAME: Name for derivative array.
:param CONSTANT_PARAMS_NAME: Name for constant parameter array.
:param verbose: Whether to print explicit initialization diagnostics.
:return: Updated initial guesses and modified model block.
"""
# Initialize memory buffers for numerical evaluation
# EMT solvers require flat arrays for efficient computation
x: np.ndarray = np.ones(len(sys_vars))
dx: np.ndarray = np.zeros(len(sys_diff_vars))
# Hydrate buffers with existing user-provided guesses
for uid, val in init_guess.items():
if verbose:
print(f"Var {sys_vars[uid].name} with uid= {uid} initialized with val: {val} ")
x[uid2idx_vars[uid]] = val
for uid, val in diff_init_guess.items():
if verbose:
print(f"Diff var {sys_diff_vars[uid].name} with uid= {uid} initialized with val: {val} ")
dx[uid2idx_diff[uid]] = val
# Static EMT constants now live in the problem-owned mapping. Build the
# dense array from that canonical source instead of the block-local cache.
_ = mdl
_ = constant_parameters
params_array: np.ndarray = np.asarray(
[float(const.value) for const in constant_parameter_values],
dtype=np.float64,
)
# Primary pass: resolve basic event parameters that are constant or immediate
event_params_array: np.ndarray = np.ones(len(variable_parameters))
for event_param in mdl.event_dict.keys():
eq: Union[Expr, Const] = mdl.event_dict[event_param]
# Determine if the expression is concrete enough to evaluate
is_concrete_const: bool = isinstance(eq, Const) and eq.value is not None
is_expression: bool = not isinstance(eq, Const)
if is_concrete_const or is_expression:
vars_list: List[Var] = find_vars_order(eq)
# Bind current values for calculation
uid_bindings: Dict[int, float] = dict()
for var in vars_list:
uid_bindings[var.uid] = float(event_params_array[uid2idx_event_params[var.uid]])
result: float = eq.eval_uid(uid_bindings)
event_params_array[uid2idx_event_params[event_param.uid]] = result
if verbose:
print(f"event_param:{event_param.name} initialized from events_dict with: {result}")
else:
event_param = event_param
# Track pending equations that require dependency resolution
init_pending: Dict[int, Tuple[Var, Union[Expr, Const]]] = dict()
for v, e in mdl.init_eqs.items():
init_pending[v.uid] = (v, e)
diff_pending: Dict[int, Tuple[Var, Union[Expr, Const]]] = dict()
for v, e in mdl.diff_init_eqs.items():
diff_pending[v.uid] = (v, e)
# Iterative solver loop
while len(init_pending) > 0 or len(diff_pending) > 0:
progress_made: bool = False
# Solve for Algebraic/State variables
for uid in list(init_pending.keys()):
var, eq = init_pending[uid]
if can_compute_init(var, eq, mdl, init_guess, diff_init_guess, uid2idx_diff, uid2idx_vars):
# Logic for variables that are registered as event parameters
if var in mdl.event_dict:
if isinstance(eq, Const) and eq.value is not None:
event_params_array[uid2idx_event_params[var.uid]] = float(eq.value)
else:
bindings: Dict[int, float] = build_uid_bindings(
eq, event_params_array, x, params_array, dx,
uid2idx_event_params, uid2idx_vars, uid2idx_params, uid2idx_diff
)
res: float = eq.eval_uid(bindings)
mdl.event_dict[var] = Const(res)
event_params_array[uid2idx_event_params[var.uid]] = res
if verbose:
print(f"event_param:{var.name} initialized from init_eqs with: {res}")
else:
pass
param_idx: int = variable_parameters.index(var)
event_parameters_eqs[param_idx].value = float(res)
event_params_array[param_idx] = float(res)
# Logic for standard algebraic variables
else:
vars_in_eqs: List[Var] = get_expression_vars(eq)
if var not in vars_in_eqs:
# Direct assignment
bindings = build_uid_bindings(
eq, event_params_array, x, params_array, dx,
uid2idx_event_params, uid2idx_vars, uid2idx_params, uid2idx_diff
)
res = eq.eval_uid(bindings)
init_guess[var.uid] = res
x[uid2idx_vars[var.uid]] = res
else:
# Implicit resolution using a damped fixed-point iteration
eq_fn: SymbolicVector = SymbolicVector(
[eq], compiler_names_dict, alias_names_dict,
VARS_NAME, DIFF_NAME, VARIABLE_PARAMS_NAME, CONSTANT_PARAMS_NAME
)
# Fixed-point parameters
tol: float = 1e-8
error: float = 1.0
alpha: float = 1.3
curr_x: float = 1.2
x[uid2idx_vars[var.uid]] = curr_x
while error > tol:
try:
# Evaluate expression with current state
eval_res: float = float(eq_fn(x, dx, event_params_array, params_array)[0])
except Exception:
# Fallback/Reset on numerical instability
x[uid2idx_vars[var.uid]] = -0.1
eval_res = float(eq_fn(x, dx, event_params_array, params_array)[0])
# Update with damping (Successive Over-Relaxation style)
x[uid2idx_vars[var.uid]] = (1 - alpha) * curr_x + alpha * eval_res
error = float(abs(eval_res - curr_x))
curr_x = float(x[uid2idx_vars[var.uid]])
init_guess[var.uid] = curr_x
del init_pending[uid]
progress_made = True
else:
uid = uid
# Solve for Differential variables
for uid in list(diff_pending.keys()):
d_var, eq = diff_pending[uid]
# Use separate check for diff equations to ensure variables are ready
if can_compute_init(d_var, eq, mdl, init_guess, diff_init_guess, uid2idx_diff, uid2idx_vars):
vars_in_eqs = get_expression_vars(eq)
if d_var not in vars_in_eqs:
bindings = build_uid_bindings(
eq, event_params_array, x, params_array, dx,
uid2idx_event_params, uid2idx_vars, uid2idx_params, uid2idx_diff
)
res = eq.eval_uid(bindings)
diff_init_guess[d_var.uid] = res
dx[uid2idx_diff[d_var.uid]] = res
else:
# Fixed-point iteration for derivatives
eq_fn = SymbolicVector(
[eq], compiler_names_dict, alias_names_dict,
VARS_NAME, DIFF_NAME, VARIABLE_PARAMS_NAME, CONSTANT_PARAMS_NAME
)
tol = 1e-8
error = 1.0
alpha = 1.3
curr_dx: float = 1.2
dx[uid2idx_diff[d_var.uid]] = curr_dx
while error > tol:
try:
eval_res = float(eq_fn(x, dx, event_params_array, params_array)[0])
except Exception:
dx[uid2idx_diff[d_var.uid]] = -0.1
eval_res = float(eq_fn(x, dx, event_params_array, params_array)[0])
dx[uid2idx_diff[d_var.uid]] = (1 - alpha) * curr_dx + alpha * eval_res
error = float(abs(eval_res - curr_dx))
curr_dx = float(dx[uid2idx_diff[d_var.uid]])
diff_init_guess[d_var.uid] = curr_dx
del diff_pending[uid]
progress_made = True
else:
uid = uid
# If no variables were resolved in this pass, the system is unsolvable
if not progress_made:
# We return None to signify failure rather than raising, per directive
return None
else:
progress_made = progress_made
return init_guess, diff_init_guess
[docs]
def init_pseudo_transient(problem: EmtProblemTemplate, options: EmtOptions) -> EmtInitializationReport:
"""
Run the EMT-native initialization workflow forcing the pure pseudo-transient path.
This helper does not execute explicit initialization. It starts from the
current problem seed, which in the main EMT build workflow corresponds to
the power-flow projected variables.
:param problem: EMT problem being initialized.
:param options: EMT simulation options.
:return: Initialization report.
"""
options.initialization_method = EmtInitializationMethod.PseudoTransient
return run_emt_native_initialization(problem, options)
[docs]
def run_emt_explicit_initialization(problem: EmtProblemTemplate, verbose: bool = False) -> None:
"""
Run the explicit EMT initialization stage on a full problem object.
:param problem: EMT problem being initialized.
:param verbose: Print explicit initialization details.
:return: None
"""
sys_vars: Dict[int, Var] = {v.uid: v for v in (problem.get_state_vars() + problem.get_algebraic_vars())}
sys_diff_vars: Dict[int, Var] = {dv.uid: dv for dv in problem.get_diff_vars()}
init_explicit_emt(
mdl=problem.sys_block,
sys_vars=sys_vars,
sys_diff_vars=sys_diff_vars,
variable_parameters=problem.get_variable_parameters(),
event_parameters_eqs=problem.get_event_parameter_equations(),
constant_parameters=problem.get_constant_parameters(),
constant_parameter_values=problem.get_parameters_values(),
init_guess=problem.init_guess,
diff_init_guess=problem.diff_init_guess,
uid2idx_vars=problem.uid2idx_vars,
uid2idx_diff=problem.uid2idx_diff,
uid2idx_params=problem.uid2idx_params,
uid2idx_event_params=problem.uid2idx_event_params,
compiler_names_dict=problem.get_compiler_names_dict(),
alias_names_dict=problem.get_alias_names_dict(),
VARIABLE_PARAMS_NAME=problem.VARIABLE_PARAMS_NAME,
VARS_NAME=problem.VARS_NAME,
DIFF_NAME=problem.DIFF_NAME,
CONSTANT_PARAMS_NAME=problem.CONSTANT_PARAMS_NAME,
verbose=verbose,
)
problem.rebuild_runtime_param_vectors()
[docs]
class ReducedInitializationSystem:
"""
Reduced EMT initialization system built on top of the unresolved variables only.
"""
__slots__ = [
"unknown_vars",
"residual_eqs",
"state_unknown_mask",
"residual_fn",
"jacobian_fn",
]
def __init__(
self,
problem: EmtProblemTemplate,
unknown_vars: List[Var],
residual_eqs: List[Expr],
state_unknown_mask: np.ndarray,
) -> None:
"""
Compile the reduced residual and Jacobian used by the consistent initializer.
:param problem: EMT problem being initialized.
:param unknown_vars: Unknown variables solved by the reduced system.
:param residual_eqs: Residual equations enforcing consistency.
:param state_unknown_mask: Mask marking which reduced unknowns correspond to states.
:return: None
"""
self.unknown_vars = unknown_vars
self.residual_eqs = residual_eqs
self.state_unknown_mask = state_unknown_mask
cache_key: str = _build_reduced_initialization_cache_key(
problem=problem,
unknown_vars=unknown_vars,
residual_eqs=residual_eqs,
state_unknown_mask=state_unknown_mask,
)
cache_entry: ReducedInitializationSystemCacheEntry | None = REDUCED_INITIALIZATION_SYSTEM_CACHE.get_entry(cache_key)
if cache_entry is None:
self.residual_fn = SymbolicVector(
eqs=residual_eqs,
compiler_names_dict=problem.get_compiler_names_dict(),
alias_names_dict=problem.get_alias_names_dict(),
VARS_NAME=problem.VARS_NAME,
DIFF_NAME=problem.DIFF_NAME,
EVENT_PARAMS_NAME=problem.VARIABLE_PARAMS_NAME,
PARAMS_NAME=problem.CONSTANT_PARAMS_NAME,
)
self.jacobian_fn = SymbolicJacobian(
eqs=residual_eqs,
variables=unknown_vars,
compiler_names_dict=problem.get_compiler_names_dict(),
alias_names_dict=problem.get_alias_names_dict(),
VARS_NAME=problem.VARS_NAME,
DIFF_NAME=problem.DIFF_NAME,
EVENT_PARAMS_NAME=problem.VARIABLE_PARAMS_NAME,
PARAMS_NAME=problem.CONSTANT_PARAMS_NAME,
)
REDUCED_INITIALIZATION_SYSTEM_CACHE.set_entry(
cache_key,
ReducedInitializationSystemCacheEntry(self.residual_fn, self.jacobian_fn),
)
else:
self.residual_fn = cache_entry.get_residual_fn()
self.jacobian_fn = cache_entry.get_jacobian_fn()
def _build_constant_param_array(problem: EmtProblemTemplate) -> np.ndarray:
"""
Build the dense constant-parameter vector used by the initialization solvers.
:param problem: EMT problem being initialized.
:return: Constant-parameter array.
"""
const_values: List[Const] = problem.get_parameters_values()
return np.asarray([float(item.value) for item in const_values], dtype=np.float64)
def _build_runtime_param_array(problem: EmtProblemTemplate) -> np.ndarray:
"""
Build the runtime-parameter snapshot at the initialization time.
:param problem: EMT problem being initialized.
:return: Runtime-parameter array at time zero.
"""
runtime_params: np.ndarray = problem.event_params_values.copy()
runtime_params = problem.def_event_params_fn(runtime_params, 0.0)
return runtime_params
def _build_state_equation_lookup(problem: EmtProblemTemplate) -> Dict[int, Expr]:
"""
Build a UID lookup for state equations.
:param problem: EMT problem being initialized.
:return: Mapping from state variable UID to its differential right-hand side.
"""
lookup: Dict[int, Expr] = dict()
state_vars: List[Var] = problem.get_state_vars()
state_eqs: List[Expr] = problem.get_state_eqs()
idx: int = 0
while idx < len(state_vars):
lookup[state_vars[idx].uid] = state_eqs[idx]
idx += 1
return lookup
def _extract_unknown_vector(problem: EmtProblemTemplate, unknown_vars: List[Var], x: np.ndarray) -> np.ndarray:
"""
Extract the reduced unknown vector from the full EMT variable vector.
:param problem: EMT problem being initialized.
:param unknown_vars: Unknown variables solved by the reduced system.
:param x: Full EMT variable vector.
:return: Reduced unknown vector.
"""
n_unknown: int = len(unknown_vars)
out: np.ndarray = np.zeros(n_unknown, dtype=np.float64)
idx: int = 0
while idx < n_unknown:
var: Var = unknown_vars[idx]
out[idx] = float(x[problem.uid2idx_vars[var.uid]])
idx += 1
return out
def _scatter_unknown_vector(problem: EmtProblemTemplate, unknown_vars: List[Var], x: np.ndarray, reduced_x: np.ndarray) -> None:
"""
Scatter the reduced unknown vector back into the full EMT variable vector.
:param problem: EMT problem being initialized.
:param unknown_vars: Unknown variables solved by the reduced system.
:param x: Full EMT variable vector updated in place.
:param reduced_x: Reduced unknown vector.
:return: None
"""
idx: int = 0
while idx < len(unknown_vars):
var: Var = unknown_vars[idx]
x[problem.uid2idx_vars[var.uid]] = float(reduced_x[idx])
idx += 1
def _build_reduced_initialization_system(
problem: EmtProblemTemplate,
allow_state_equilibrium: bool,
) -> ReducedInitializationSystem | None:
"""
Build the reduced initialization system for unresolved EMT variables.
The reduced system always enforces all algebraic equations. In addition, any
state variable still missing after the explicit stage receives one extra
consistency equation, either from `init_eqs` or from a steady-state residual.
:param problem: EMT problem being initialized.
:param allow_state_equilibrium: Whether unresolved states may use f(x,z,p)=0.
:return: Reduced initialization system or None if nothing remains to solve.
"""
reduced_problem = _collect_reduced_initialization_problem(problem, allow_state_equilibrium)
if reduced_problem is None:
return None
else:
unknown_vars, residual_eqs, mask_array = reduced_problem
return ReducedInitializationSystem(
problem=problem,
unknown_vars=unknown_vars,
residual_eqs=residual_eqs,
state_unknown_mask=mask_array,
)
[docs]
class EmtPseudoTransientProblemAdapter(RmsProblemTemplate):
"""
Adapt one EMT reduced initialization system to the RMS pseudo-transient solver API.
The RMS pseudo-transient implementation expects a problem object exposing the
RMS numerical interface. This adapter provides that interface on top of the
EMT reduced initialization residual and Jacobian so the EMT initializer can
reuse the existing pseudo-transient algorithm without changing the solver.
"""
__slots__ = [
"_reduced_system",
"_emt_problem",
"_runtime_params",
"_constant_params",
"_x_full",
"_dx_full",
"_reduced_uid2idx_vars",
"uid2idx_vars",
]
def __init__(
self,
reduced_system: ReducedInitializationSystem,
problem: EmtProblemTemplate,
x_full: np.ndarray,
dx_full: np.ndarray,
runtime_params: np.ndarray,
constant_params: np.ndarray,
) -> None:
"""
Build one RMS-compatible pseudo-transient view over an EMT reduced system.
:param reduced_system: EMT reduced initialization system.
:type reduced_system: ReducedInitializationSystem
:param problem: EMT problem being initialized.
:type problem: EmtProblemTemplate
:param x_full: Full EMT variable vector used as the backing storage.
:type x_full: np.ndarray
:param dx_full: Full EMT differential vector used during residual evaluation.
:type dx_full: np.ndarray
:param runtime_params: Runtime-parameter snapshot for initialization.
:type runtime_params: np.ndarray
:param constant_params: Constant-parameter snapshot for initialization.
:type constant_params: np.ndarray
:return: None.
:rtype: None
"""
super().__init__(progress_signal=None, progress_text=None)
self._reduced_system: ReducedInitializationSystem = reduced_system
self._emt_problem: EmtProblemTemplate = problem
self._runtime_params: np.ndarray = runtime_params
self._constant_params: np.ndarray = constant_params
self._x_full: np.ndarray = x_full
self._dx_full: np.ndarray = dx_full
self._reduced_uid2idx_vars: Dict[int, int] = dict()
self.uid2idx_vars: Dict[int, int] = dict()
reduced_index: int = 0
for reduced_var in self._reduced_system.unknown_vars:
self._reduced_uid2idx_vars[reduced_var.uid] = reduced_index
self.uid2idx_vars[reduced_var.uid] = reduced_index
reduced_index += 1
[docs]
def get_x0(self) -> np.ndarray:
"""
Return the reduced unknown vector used to seed pseudo-transient iterations.
:return: Reduced initialization vector.
:rtype: np.ndarray
"""
reduced_x0: np.ndarray = _extract_unknown_vector(self._emt_problem, self._reduced_system.unknown_vars, self._x_full)
return reduced_x0
[docs]
def get_all_vars_number(self) -> int:
"""
Return the number of reduced initialization unknowns.
:return: Number of reduced unknowns.
:rtype: int
"""
return len(self._reduced_system.unknown_vars)
[docs]
def get_diff_var_number(self) -> int:
"""
Return the number of differential variables used by the reduced solver.
The RMS pseudo-transient solver asks for a differential-vector size even
when the EMT reduced initialization stage does not solve an explicit EMT
differential system in local coordinates. Returning the full EMT `dx`
length preserves compatibility with the residual evaluation helpers.
:return: Differential-vector size expected by the EMT residual evaluation.
:rtype: int
"""
return int(self._dx_full.size)
[docs]
def get_algebraic_var_number(self) -> int:
"""
Return the number of reduced algebraic-like unknowns.
The reduced initialization system mixes algebraic variables with any
unresolved states. The RMS solver only needs a partition size, so the
complementary block to the unresolved-state count is returned here.
:return: Number of reduced unknowns treated as non-state variables.
:rtype: int
"""
total_unknowns: int = len(self._reduced_system.unknown_vars)
n_states: int = self.get_states_number()
return total_unknowns - n_states
[docs]
def get_states_number(self) -> int:
"""
Return the number of reduced unknowns that correspond to unresolved states.
:return: Count of reduced state unknowns.
:rtype: int
"""
n_states: int = int(np.count_nonzero(self._reduced_system.state_unknown_mask))
return n_states
[docs]
def get_algebraic_vars(self) -> List[Var]:
"""
Return the reduced unknowns treated as algebraic by the adapter partition.
:return: Reduced non-state unknown list.
:rtype: List[Var]
"""
algebraic_vars: List[Var] = list()
idx: int = 0
n_unknowns: int = len(self._reduced_system.unknown_vars)
while idx < n_unknowns:
if float(self._reduced_system.state_unknown_mask[idx]) > 0.5:
pass
else:
algebraic_vars.append(self._reduced_system.unknown_vars[idx])
idx += 1
return algebraic_vars
[docs]
def algebraic_vars(self) -> List[Var]:
"""
Return the reduced non-state unknowns.
:return: Reduced non-state unknown list.
:rtype: List[Var]
"""
return self.get_algebraic_vars()
[docs]
def state_vars(self) -> List[Var]:
"""
Return the reduced unknowns treated as states by the adapter partition.
:return: Reduced state unknown list.
:rtype: List[Var]
"""
state_vars_list: List[Var] = list()
idx: int = 0
n_unknowns: int = len(self._reduced_system.unknown_vars)
while idx < n_unknowns:
if float(self._reduced_system.state_unknown_mask[idx]) > 0.5:
state_vars_list.append(self._reduced_system.unknown_vars[idx])
else:
pass
idx += 1
return state_vars_list
[docs]
def update_variable_params(self, t: float, x_snapshot: np.ndarray | None = None) -> None:
"""
Keep the EMT runtime-parameter snapshot aligned with the RMS solver API.
The EMT reduced initializer freezes the runtime parameters before entering
the pseudo-transient solve. The RMS solver still invokes this hook at the
beginning of the loop, so the adapter accepts the call and intentionally
preserves the already prepared parameter arrays.
:param t: Pseudo-time value requested by the solver.
:type t: float
:param x_snapshot: Optional reduced state snapshot.
:type x_snapshot: np.ndarray | None
:return: None.
:rtype: None
"""
if x_snapshot is None:
t = float(t)
else:
t = float(t)
x_snapshot = np.array(x_snapshot, dtype=np.float64, copy=False)
[docs]
def get_dx(self, x: np.ndarray, xn: np.ndarray, dx: np.ndarray, h: float) -> np.ndarray:
"""
Return the EMT differential vector used by the reduced residual helpers.
The RMS pseudo-transient code requests a derivative estimate for its DAE
residual construction. The EMT reduced initialization residual already
carries all required differential information through the full EMT `dx`
buffer, so the adapter returns a copy of that frozen vector.
:param x: Current reduced iterate.
:type x: np.ndarray
:param xn: Previous reduced iterate.
:type xn: np.ndarray
:param dx: Previous differential estimate.
:type dx: np.ndarray
:param h: Pseudo-time step size.
:type h: float
:return: Differential vector used by EMT residual evaluation.
:rtype: np.ndarray
"""
x = np.array(x, dtype=np.float64, copy=False)
xn = np.array(xn, dtype=np.float64, copy=False)
dx = np.array(dx, dtype=np.float64, copy=False)
h = float(h)
return self._dx_full.copy()
[docs]
def rhs_state(self, x: np.ndarray, dx: np.ndarray) -> np.ndarray:
"""
Return the reduced residual entries associated with unresolved states.
:param x: Current reduced iterate.
:type x: np.ndarray
:param dx: Differential vector requested by the solver.
:type dx: np.ndarray
:return: Reduced residual state block.
:rtype: np.ndarray
"""
full_residual: np.ndarray = self._evaluate_full_reduced_residual(x, dx)
n_states: int = self.get_states_number()
if n_states > 0:
return full_residual[:n_states]
else:
return np.zeros(0, dtype=np.float64)
[docs]
def rhs_algebraic(self, values: np.ndarray, diff_values: np.ndarray) -> np.ndarray:
"""
Return the reduced residual entries associated with non-state unknowns.
:param values: Current reduced iterate.
:type values: np.ndarray
:param diff_values: Differential vector requested by the solver.
:type diff_values: np.ndarray
:return: Reduced residual algebraic block.
:rtype: np.ndarray
"""
full_residual: np.ndarray = self._evaluate_full_reduced_residual(values, diff_values)
n_states: int = self.get_states_number()
return full_residual[n_states:]
[docs]
def get_j11(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
"""
Return the reduced Jacobian state-state block.
:param x: Current reduced iterate.
:type x: np.ndarray
:param dx: Differential vector requested by the solver.
:type dx: np.ndarray
:param h: Pseudo-time step size.
:type h: float
:return: State-state Jacobian block.
:rtype: sp.csc_matrix
"""
jacobian: sp.csc_matrix = self._compute_numerical_jacobian(x, dx, h)
n_states: int = self.get_states_number()
if n_states > 0:
return jacobian[:n_states, :n_states].tocsc()
else:
return sp.csc_matrix((0, 0), dtype=np.float64)
[docs]
def get_j12(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
"""
Return the reduced Jacobian state-algebraic block.
:param x: Current reduced iterate.
:type x: np.ndarray
:param dx: Differential vector requested by the solver.
:type dx: np.ndarray
:param h: Pseudo-time step size.
:type h: float
:return: State-algebraic Jacobian block.
:rtype: sp.csc_matrix
"""
jacobian: sp.csc_matrix = self._compute_numerical_jacobian(x, dx, h)
n_states: int = self.get_states_number()
return jacobian[:n_states, n_states:].tocsc()
[docs]
def get_j21(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
"""
Return the reduced Jacobian algebraic-state block.
:param x: Current reduced iterate.
:type x: np.ndarray
:param dx: Differential vector requested by the solver.
:type dx: np.ndarray
:param h: Pseudo-time step size.
:type h: float
:return: Algebraic-state Jacobian block.
:rtype: sp.csc_matrix
"""
jacobian: sp.csc_matrix = self._compute_numerical_jacobian(x, dx, h)
n_states: int = self.get_states_number()
return jacobian[n_states:, :n_states].tocsc()
[docs]
def get_j22(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
"""
Return the reduced Jacobian algebraic-algebraic block.
:param x: Current reduced iterate.
:type x: np.ndarray
:param dx: Differential vector requested by the solver.
:type dx: np.ndarray
:param h: Pseudo-time step size.
:type h: float
:return: Algebraic-algebraic Jacobian block.
:rtype: sp.csc_matrix
"""
jacobian: sp.csc_matrix = self._compute_numerical_jacobian(x, dx, h)
n_states: int = self.get_states_number()
return jacobian[n_states:, n_states:].tocsc()
def _compute_numerical_jacobian(self, x: np.ndarray, dx: np.ndarray, h: float) -> sp.csc_matrix:
"""
Return the EMT reduced Jacobian in the matrix form expected by RMS.
The EMT reduced system already exposes a compiled Jacobian evaluator over
the reduced unknown ordering. The adapter scatters the reduced iterate to
the backing EMT vectors and forwards the call directly.
:param x: Current reduced iterate.
:type x: np.ndarray
:param dx: Differential vector requested by the solver.
:type dx: np.ndarray
:param h: Pseudo-time step size.
:type h: float
:return: Reduced Jacobian matrix.
:rtype: sp.csc_matrix
"""
h = float(h)
full_dx: np.ndarray = self._select_full_dx(dx)
_scatter_unknown_vector(self._emt_problem, self._reduced_system.unknown_vars, self._x_full, x)
jacobian: sp.csc_matrix = _evaluate_initialization_jacobian(
reduced_system=self._reduced_system,
problem=self._emt_problem,
x=self._x_full,
dx=full_dx,
runtime_params=self._runtime_params,
constant_params=self._constant_params,
)
return jacobian.tocsc(copy=True)
def _evaluate_full_reduced_residual(self, reduced_x: np.ndarray, dx: np.ndarray) -> np.ndarray:
"""
Evaluate the full EMT reduced residual at one reduced iterate.
:param reduced_x: Current reduced iterate.
:type reduced_x: np.ndarray
:param dx: Differential vector requested by the solver.
:type dx: np.ndarray
:return: Reduced residual vector.
:rtype: np.ndarray
"""
full_dx: np.ndarray = self._select_full_dx(dx)
_scatter_unknown_vector(self._emt_problem, self._reduced_system.unknown_vars, self._x_full, reduced_x)
residual: np.ndarray = _evaluate_initialization_residual(
reduced_system=self._reduced_system,
problem=self._emt_problem,
x=self._x_full,
dx=full_dx,
runtime_params=self._runtime_params,
constant_params=self._constant_params,
)
return residual.copy()
def _select_full_dx(self, dx: np.ndarray) -> np.ndarray:
"""
Resolve the full EMT differential vector used for residual evaluation.
:param dx: Differential vector received from the RMS solver.
:type dx: np.ndarray
:return: Full EMT differential vector.
:rtype: np.ndarray
"""
if dx.size == self._dx_full.size:
return np.array(dx, dtype=np.float64, copy=True)
else:
return self._dx_full.copy()
def _collect_reduced_initialization_problem(
problem: EmtProblemTemplate,
allow_state_equilibrium: bool,
) -> Tuple[List[Var], List[Expr], np.ndarray] | None:
"""
Collect the reduced initialization unknowns and residual equations without compiling them.
:param problem: EMT problem being initialized.
:type problem: EmtProblemTemplate
:param allow_state_equilibrium: Whether unresolved states may use ``f(x,z,p)=0``.
:type allow_state_equilibrium: bool
:return: Tuple ``(unknown_vars, residual_eqs, state_unknown_mask)`` or ``None``.
:rtype: Tuple[List[Var], List[Expr], np.ndarray] | None
"""
state_unknown_vars: List[Var] = list()
state_residual_eqs: List[Expr] = list()
algebraic_unknown_vars: List[Var] = list()
algebraic_residual_eqs: List[Expr] = list()
state_eq_lookup: Dict[int, Expr] = _build_state_equation_lookup(problem)
init_eq_lookup: Dict[Var, Expr] = problem.sys_block.init_eqs
algebraic_vars: List[Var] = problem.get_algebraic_vars()
algebraic_eqs: List[Expr] = problem.get_algebraic_eqs()
alg_idx: int = 0
while alg_idx < len(algebraic_vars):
algebraic_var: Var = algebraic_vars[alg_idx]
algebraic_unknown_vars.append(algebraic_var)
algebraic_residual_eqs.append(algebraic_eqs[alg_idx])
alg_idx += 1
for state_var in problem.get_state_vars():
if state_var.uid in problem.init_guess:
state_var = state_var
else:
state_unknown_vars.append(state_var)
if state_var in init_eq_lookup:
state_residual_eqs.append(state_var - init_eq_lookup[state_var])
elif allow_state_equilibrium and state_var.uid in state_eq_lookup:
state_residual_eqs.append(state_eq_lookup[state_var.uid])
else:
state_residual_eqs.append(state_var)
unknown_vars: List[Var] = state_unknown_vars + algebraic_unknown_vars
residual_eqs: List[Expr] = state_residual_eqs + algebraic_residual_eqs
state_unknown_flags: List[float] = [1.0] * len(state_unknown_vars) + [0.0] * len(algebraic_unknown_vars)
if len(unknown_vars) == 0:
return None
else:
return unknown_vars, residual_eqs, np.asarray(state_unknown_flags, dtype=np.float64)
def _evaluate_initialization_residual(
reduced_system: ReducedInitializationSystem,
problem: EmtProblemTemplate,
x: np.ndarray,
dx: np.ndarray,
runtime_params: np.ndarray,
constant_params: np.ndarray,
) -> np.ndarray:
"""
Evaluate the reduced initialization residual.
:param reduced_system: Reduced initialization system.
:param problem: EMT problem being initialized.
:param x: Full EMT variable vector.
:param dx: Full EMT differential vector.
:param runtime_params: Runtime-parameter vector.
:param constant_params: Constant-parameter vector.
:return: Reduced residual vector.
"""
_unused = problem
return reduced_system.residual_fn(x, dx, runtime_params, constant_params).copy()
def _evaluate_initialization_jacobian(
reduced_system: ReducedInitializationSystem,
problem: EmtProblemTemplate,
x: np.ndarray,
dx: np.ndarray,
runtime_params: np.ndarray,
constant_params: np.ndarray,
) -> sp.csc_matrix:
"""
Evaluate the reduced initialization Jacobian.
:param reduced_system: Reduced initialization system.
:param problem: EMT problem being initialized.
:param x: Full EMT variable vector.
:param dx: Full EMT differential vector.
:param runtime_params: Runtime-parameter vector.
:param constant_params: Constant-parameter vector.
:return: Reduced sparse Jacobian.
"""
_unused = problem
return reduced_system.jacobian_fn(x, dx, runtime_params, constant_params, 1.0).tocsc(copy=True)
def _evaluate_reduced_pseudo_transient_residual(
reduced_system: ReducedInitializationSystem,
problem: EmtProblemTemplate,
x: np.ndarray,
x_prev_reduced: np.ndarray,
dx: np.ndarray,
runtime_params: np.ndarray,
constant_params: np.ndarray,
dtau: float,
) -> np.ndarray:
"""
Evaluate the reduced EMT pseudo-transient residual.
State rows use a backward-Euler pseudo-time term while algebraic rows keep the
original reduced steady-state residual.
:param reduced_system: Reduced initialization system.
:param problem: EMT problem being initialized.
:param x: Full EMT variable vector at the current reduced iterate.
:param x_prev_reduced: Previous reduced iterate used as the pseudo-time reference.
:param dx: Full EMT differential vector.
:param runtime_params: Runtime-parameter vector.
:param constant_params: Constant-parameter vector.
:param dtau: Pseudo-time step.
:return: Reduced pseudo-transient residual vector.
"""
# Start from the steady-state reduced residual because the EMT initialization
# problem is defined in those equilibrium equations already.
residual: np.ndarray = _evaluate_initialization_residual(
reduced_system=reduced_system,
problem=problem,
x=x,
dx=dx,
runtime_params=runtime_params,
constant_params=constant_params,
)
# Recover the current reduced iterate so the state rows can add the
# backward-Euler pseudo-time difference against the previous iterate.
reduced_x: np.ndarray = _extract_unknown_vector(problem, reduced_system.unknown_vars, x)
state_mask: np.ndarray = np.asarray(reduced_system.state_unknown_mask, dtype=np.float64)
dtau_eff: float = float(max(dtau, 1.0e-30))
residual = residual.copy()
# Only unresolved state rows receive the pseudo-time term. Algebraic rows stay
# unchanged so they continue to enforce the original consistency equations.
residual += state_mask * ((reduced_x - x_prev_reduced) / dtau_eff)
return residual
def _evaluate_reduced_pseudo_transient_jacobian(
reduced_system: ReducedInitializationSystem,
problem: EmtProblemTemplate,
x: np.ndarray,
dx: np.ndarray,
runtime_params: np.ndarray,
constant_params: np.ndarray,
dtau: float,
) -> sp.csc_matrix:
"""
Evaluate the Jacobian of the reduced EMT pseudo-transient residual.
:param reduced_system: Reduced initialization system.
:param problem: EMT problem being initialized.
:param x: Full EMT variable vector at the current reduced iterate.
:param dx: Full EMT differential vector.
:param runtime_params: Runtime-parameter vector.
:param constant_params: Constant-parameter vector.
:param dtau: Pseudo-time step.
:return: Reduced pseudo-transient Jacobian matrix in CSC format.
"""
# Reuse the steady-state Jacobian because it already differentiates the reduced
# EMT equilibrium equations with respect to the reduced unknown ordering.
jacobian: sp.csc_matrix = _evaluate_initialization_jacobian(
reduced_system=reduced_system,
problem=problem,
x=x,
dx=dx,
runtime_params=runtime_params,
constant_params=constant_params,
)
state_mask: np.ndarray = np.asarray(reduced_system.state_unknown_mask, dtype=np.float64)
dtau_eff: float = float(max(dtau, 1.0e-30))
# The backward-Euler pseudo-time term contributes only on state rows, so the
# Jacobian adds a diagonal 1/dtau term only for reduced state unknowns.
return (jacobian + sp.diags(state_mask / dtau_eff, format="csc")).tocsc(copy=True)
def _max_abs_value(values: np.ndarray) -> float:
"""
Return the infinity norm of one dense vector.
:param values: Dense vector.
:return: Maximum absolute value.
"""
if values.size == 0:
return 0.0
else:
return float(np.max(np.abs(values)))
def _solve_reduced_linear_system(
matrix: sp.csc_matrix,
rhs: np.ndarray,
dense_threshold: int,
) -> np.ndarray:
"""
Solve one reduced linear system using a dense or sparse backend depending on size.
:param matrix: Reduced Jacobian-like matrix.
:param rhs: Right-hand-side vector.
:param dense_threshold: Dense/sparse switching threshold.
:return: Linear-system solution vector.
"""
n_unknown: int = int(matrix.shape[0])
if n_unknown <= dense_threshold:
dense_matrix: np.ndarray = matrix.toarray()
try:
return np.linalg.solve(dense_matrix, rhs)
except np.linalg.LinAlgError:
solution, _, _, _ = np.linalg.lstsq(dense_matrix, rhs, rcond=None)
return solution
else:
try:
with warnings.catch_warnings():
warnings.simplefilter("error")
return spla.spsolve(matrix, rhs)
except Exception:
lsqr_solution = spla.lsqr(matrix, rhs)
return np.asarray(lsqr_solution[0], dtype=np.float64)
def _run_reduced_newton_initialization(
reduced_system: ReducedInitializationSystem,
problem: EmtProblemTemplate,
x: np.ndarray,
dx: np.ndarray,
runtime_params: np.ndarray,
constant_params: np.ndarray,
options: EmtOptions,
report: EmtInitializationReport,
) -> bool:
"""
Solve the reduced initialization system with damped sparse Newton iterations.
:param reduced_system: Reduced initialization system.
:param problem: EMT problem being initialized.
:param x: Full EMT variable vector updated in place.
:param dx: Full EMT differential vector.
:param runtime_params: Runtime-parameter vector.
:param constant_params: Constant-parameter vector.
:param options: EMT simulation options.
:param report: Initialization report updated in place.
:return: True if Newton converged.
"""
reduced_x: np.ndarray = _extract_unknown_vector(problem, reduced_system.unknown_vars, x)
max_iter: int = int(options.init_newton_max_iter)
tol: float = float(options.init_newton_tol)
iter_idx: int = 0
while iter_idx < max_iter:
_scatter_unknown_vector(problem, reduced_system.unknown_vars, x, reduced_x)
residual: np.ndarray = _evaluate_initialization_residual(reduced_system, problem, x, dx, runtime_params, constant_params)
res_norm: float = _max_abs_value(residual)
# report
if iter_idx == 0:
report.initial_residual_inf = res_norm
else:
report.initial_residual_inf = report.initial_residual_inf
report.final_residual_inf = res_norm
if res_norm <= tol:
report.newton_iterations = iter_idx
return True
else:
report.final_residual_inf = report.final_residual_inf
jacobian: sp.csc_matrix = _evaluate_initialization_jacobian(reduced_system, problem, x, dx, runtime_params, constant_params)
delta: np.ndarray = _solve_reduced_linear_system(jacobian, -residual, int(options.init_dense_threshold))
if np.all(np.isfinite(delta)):
delta = delta
else:
return False
alpha: float = 1.0
accepted: bool = False
while alpha >= 1.0e-4:
trial_reduced_x: np.ndarray = reduced_x + alpha * delta
_scatter_unknown_vector(problem, reduced_system.unknown_vars, x, trial_reduced_x)
trial_residual: np.ndarray = _evaluate_initialization_residual(reduced_system, problem, x, dx, runtime_params, constant_params)
trial_norm: float = _max_abs_value(trial_residual)
if (not options.init_newton_backtracking) or (trial_norm < res_norm):
reduced_x = trial_reduced_x
accepted = True
break
else:
alpha *= 0.5
if accepted:
reduced_x = reduced_x
else:
return False
iter_idx += 1
report.newton_iterations = max_iter
return False
def _run_reduced_pseudo_transient_initialization(
reduced_system: ReducedInitializationSystem,
problem: EmtProblemTemplate,
x: np.ndarray,
dx: np.ndarray,
runtime_params: np.ndarray,
constant_params: np.ndarray,
options: EmtOptions,
report: EmtInitializationReport,
) -> bool:
"""
Solve the reduced initialization system with pseudo-transient iterations.
:param reduced_system: Reduced initialization system.
:param problem: EMT problem being initialized.
:param x: Full EMT variable vector updated in place.
:param dx: Full EMT differential vector.
:param runtime_params: Runtime-parameter vector.
:param constant_params: Constant-parameter vector.
:param options: EMT simulation options.
:param report: Initialization report updated in place.
:return: True if pseudo-transient converged.
"""
# The reduced vector stores only unresolved EMT unknowns. The pseudo-transient
# loop advances those unknowns while the full EMT storage remains authoritative.
reduced_x: np.ndarray = _extract_unknown_vector(problem, reduced_system.unknown_vars, x)
dtau: float = float(options.init_ptc_dtau0)
dtau_min: float = float(options.init_ptc_dtau_min)
dtau_max: float = float(options.init_ptc_dtau_max)
max_iter: int = int(options.init_ptc_max_iter)
tol: float = float(options.init_newton_tol)
# The previous reduced iterate supplies the pseudo-time reference for state rows.
x_prev_reduced: np.ndarray = reduced_x.copy()
iter_idx: int = 0
while iter_idx < max_iter:
# Scatter first so all residual and Jacobian evaluations see one consistent
# full EMT state assembled from the current reduced iterate.
_scatter_unknown_vector(problem, reduced_system.unknown_vars, x, reduced_x)
residual: np.ndarray = _evaluate_reduced_pseudo_transient_residual(
reduced_system=reduced_system,
problem=problem,
x=x,
x_prev_reduced=x_prev_reduced,
dx=dx,
runtime_params=runtime_params,
constant_params=constant_params,
dtau=dtau,
)
res_norm: float = _max_abs_value(residual)
report.final_residual_inf = res_norm
if res_norm <= tol:
report.pseudo_transient_steps = iter_idx
return True
else:
report.final_residual_inf = report.final_residual_inf
# Build the Jacobian of the pseudo-transient residual, not just the original
# steady-state residual, so the Newton-like correction matches the algorithm.
linear_matrix: sp.csc_matrix = _evaluate_reduced_pseudo_transient_jacobian(
reduced_system=reduced_system,
problem=problem,
x=x,
dx=dx,
runtime_params=runtime_params,
constant_params=constant_params,
dtau=max(dtau, dtau_min),
)
delta: np.ndarray = _solve_reduced_linear_system(linear_matrix, -residual, int(options.init_dense_threshold))
if np.all(np.isfinite(delta)):
# Form one trial iterate and accept it only when the pseudo-transient
# residual decreases. This keeps the continuation monotone enough for
# the small initialization problems solved here.
trial_reduced_x: np.ndarray = reduced_x + delta
_scatter_unknown_vector(problem, reduced_system.unknown_vars, x, trial_reduced_x)
trial_residual: np.ndarray = _evaluate_reduced_pseudo_transient_residual(
reduced_system=reduced_system,
problem=problem,
x=x,
x_prev_reduced=x_prev_reduced,
dx=dx,
runtime_params=runtime_params,
constant_params=constant_params,
dtau=max(dtau, dtau_min),
)
trial_norm: float = _max_abs_value(trial_residual)
if trial_norm < res_norm:
# Once a step is accepted, the old iterate becomes the new pseudo-time
# reference and dtau is expanded to accelerate convergence.
x_prev_reduced = reduced_x.copy()
reduced_x = trial_reduced_x
dtau = min(dtau_max, dtau * 2.0)
else:
# Rejected steps shrink dtau so the next pseudo-time correction is
# more damped and therefore more likely to be accepted.
dtau = max(dtau_min, dtau * 0.5)
else:
# Non-finite linear corrections indicate an unusable local model, so the
# next attempt must reduce dtau instead of updating the iterate.
dtau = max(dtau_min, dtau * 0.5)
iter_idx += 1
report.pseudo_transient_steps = max_iter
return False
def _run_pseudo_transient_initialization(
reduced_system: ReducedInitializationSystem,
problem: EmtProblemTemplate,
x: np.ndarray,
dx: np.ndarray,
runtime_params: np.ndarray,
constant_params: np.ndarray,
options: EmtOptions,
report: EmtInitializationReport,
) -> bool:
"""
Solve the EMT reduced initialization system using the RMS pseudo-transient solver.
This function is intentionally shaped like
``_run_reduced_pseudo_transient_initialization()`` so the EMT initialization
workflow can switch between both pseudo-transient implementations by only
changing the callee in the native initialization dispatcher.
The adapter created here maps the EMT reduced residual and Jacobian to the
problem interface expected by the RMS ``PseudoTransient`` solver. The solver
operates on the reduced EMT unknown vector, and the accepted solution is then
scattered back into the full EMT state vector before returning.
:param reduced_system: Reduced initialization system.
:type reduced_system: ReducedInitializationSystem
:param problem: EMT problem being initialized.
:type problem: EmtProblemTemplate
:param x: Full EMT variable vector updated in place.
:type x: np.ndarray
:param dx: Full EMT differential vector.
:type dx: np.ndarray
:param runtime_params: Runtime-parameter vector.
:type runtime_params: np.ndarray
:param constant_params: Constant-parameter vector.
:type constant_params: np.ndarray
:param options: EMT simulation options.
:type options: EmtOptions
:param report: Initialization report updated in place.
:type report: EmtInitializationReport
:return: ``True`` when the RMS pseudo-transient solver reaches the EMT tolerance.
:rtype: bool
"""
adapter_problem: EmtPseudoTransientProblemAdapter = EmtPseudoTransientProblemAdapter(
reduced_system=reduced_system,
problem=problem,
x_full=x,
dx_full=dx,
runtime_params=runtime_params,
constant_params=constant_params,
)
# The RMS pseudo-transient solver works on its own reduced local vector. The
# EMT x-vector therefore remains the authoritative storage and is updated only
# once a candidate solution has been produced.
reduced_x0: np.ndarray = adapter_problem.get_x0()
solver: PseudoTransient = PseudoTransient(
problem=adapter_problem,
h=1.0,
dtau0=float(options.init_ptc_dtau0),
dtau_max=float(options.init_ptc_dtau_max),
dtau_min=float(options.init_ptc_dtau_min),
tol=float(options.init_newton_tol),
max_iter=int(options.init_ptc_max_iter),
verbose=bool(options.verbose > 0),
reference_error_tol=-1.0,
fixed_var_uids=list(),
)
reduced_solution: np.ndarray
reduced_solution: np.ndarray
init_guess_map: Dict[Var, float]
reduced_solution, init_guess_map = solver.simulate(plot=False, x0=reduced_x0)
if len(init_guess_map) > 0:
init_guess_map = init_guess_map
else:
init_guess_map = init_guess_map
# The RMS solver returns the reduced unknown vector. That vector must be
# scattered back to the full EMT variable storage before the residual is
# evaluated under EMT convergence rules.
_scatter_unknown_vector(problem, reduced_system.unknown_vars, x, reduced_solution)
final_residual: np.ndarray = _evaluate_initialization_residual(
reduced_system=reduced_system,
problem=problem,
x=x,
dx=dx,
runtime_params=runtime_params,
constant_params=constant_params,
)
final_residual_inf: float = _max_abs_value(final_residual)
report.final_residual_inf = final_residual_inf
if np.isfinite(final_residual_inf):
report.pseudo_transient_steps = int(options.init_ptc_max_iter)
else:
report.pseudo_transient_steps = int(options.init_ptc_max_iter)
if final_residual_inf <= float(options.init_newton_tol):
return True
else:
return False
def _apply_solution_to_problem(problem: EmtProblemTemplate, x: np.ndarray) -> None:
"""
Persist the reduced-solve state vector back into the initialization guess map.
:param problem: EMT problem being initialized.
:param x: Full EMT variable vector.
:return: None
"""
all_vars: List[Var] = list(problem.get_state_vars()) + list(problem.get_algebraic_vars())
for var in all_vars:
problem.init_guess[var.uid] = float(x[problem.uid2idx_vars[var.uid]])
def _compute_missing_dx0(problem: EmtProblemTemplate,
report: EmtInitializationReport,
x_full: np.ndarray | None = None,
dx_full: np.ndarray | None = None,
runtime_params: np.ndarray | None = None,
constant_params: np.ndarray | None = None,
include_existing: bool = False) -> None:
"""
Compute missing differential initial values from the state equations.
:param problem: EMT problem being initialized.
:param report: Initialization report updated in place.
:return: None
"""
state_eqs: List[Expr] = problem.get_state_eqs()
if len(state_eqs) == 0:
return
else:
state_eqs = state_eqs
phase_t0: float = time.perf_counter()
missing_diff_vars: List[Var]
missing_state_eqs: List[Expr]
missing_diff_vars, missing_state_eqs = _collect_missing_dx_problem(problem, include_existing=include_existing)
report.missing_dx_problem_collect_s = float(time.perf_counter() - phase_t0)
if len(missing_diff_vars) == 0:
return
else:
pass
phase_t0 = time.perf_counter()
if x_full is None:
x0: np.ndarray = problem.get_x0()
else:
x0 = x_full
report.x0_rebuild_s = float(time.perf_counter() - phase_t0)
phase_t0 = time.perf_counter()
if dx_full is None:
dx0_seed: np.ndarray = problem.get_dx0()
else:
dx0_seed = dx_full
report.dx0_seed_build_s = float(time.perf_counter() - phase_t0)
if runtime_params is None or len(runtime_params) != problem.get_variable_parameter_number():
runtime_params = _build_runtime_param_array(problem)
else:
runtime_params = runtime_params
if constant_params is None or len(constant_params) != len(problem.get_parameters_values()):
constant_params = _build_constant_param_array(problem)
else:
constant_params = constant_params
cache_key: str = _build_state_rhs_cache_key(problem, missing_state_eqs)
state_rhs_fn = _load_or_build_state_rhs_vector(problem, missing_state_eqs, cache_key, report)
phase_t0 = time.perf_counter()
computed_dx: np.ndarray = state_rhs_fn(x0, dx0_seed, runtime_params, constant_params).copy()
report.state_rhs_eval_s = float(time.perf_counter() - phase_t0)
idx: int = 0
phase_t0 = time.perf_counter()
while idx < len(missing_diff_vars):
diff_var: Var = missing_diff_vars[idx]
problem.diff_init_guess[diff_var.uid] = float(computed_dx[idx])
report.automatic_dx0_count += 1
idx += 1
report.missing_dx_scatter_s = float(time.perf_counter() - phase_t0)
def _build_initialization_system_if_needed(
problem: EmtProblemTemplate,
options: EmtOptions,
report: EmtInitializationReport,
) -> ReducedInitializationSystem | None:
"""
Build the reduced initialization system only when unresolved work remains.
:param problem: EMT problem being initialized.
:param options: EMT simulation options.
:param report: Initialization report updated in place.
:return: Reduced initialization system or None.
"""
reduced_system: ReducedInitializationSystem | None = _build_reduced_initialization_system(
problem=problem,
allow_state_equilibrium=bool(options.init_allow_state_equilibrium),
)
if reduced_system is None:
report.unknown_var_count = 0
report.unresolved_state_count = 0
else:
report.unknown_var_count = len(reduced_system.unknown_vars)
report.unresolved_state_count = int(np.count_nonzero(reduced_system.state_unknown_mask))
return reduced_system
[docs]
def run_emt_native_initialization(problem: EmtProblemTemplate, options: EmtOptions) -> EmtInitializationReport:
"""
Run the EMT-native initialization stages after power-flow variable projection.
The workflow keeps the current seed intact, then solves only the unresolved
subset with the native initialization algorithm requested by
``options.initialization_method``. In the main EMT build workflow, the seed
comes either from power-flow projection alone for pure ``PseudoTransient`` or
from power-flow projection plus explicit initialization for the other modes.
Missing differential initial values are computed automatically from the state
equations.
:param problem: EMT problem being initialized.
:param options: EMT simulation options.
:return: Initialization report.
"""
report = EmtInitializationReport(method_requested=options.initialization_method)
start_time: float = time.perf_counter()
x: np.ndarray = problem.get_x0()
x_seed: np.ndarray = x.copy()
dx: np.ndarray = problem.get_dx0()
method_requested: EmtInitializationMethod = options.initialization_method
runtime_params: np.ndarray = np.zeros(0, dtype=np.float64)
constant_params: np.ndarray = np.zeros(0, dtype=np.float64)
reduced_cache_key: str | None = None
if method_requested == EmtInitializationMethod.Explicit:
report.status = InitializationStatus.RESOLVED
report.method_used = EmtInitializationMethod.Explicit
else:
reduced_system: ReducedInitializationSystem | None
runtime_params: np.ndarray
constant_params: np.ndarray
reduced_problem_payload: Tuple[List[Var], List[Expr], np.ndarray] | None
try:
phase_t0: float = time.perf_counter()
reduced_problem_payload = _collect_reduced_initialization_problem(
problem=problem,
allow_state_equilibrium=bool(options.init_allow_state_equilibrium),
)
if reduced_problem_payload is None:
reduced_system = None
report.unknown_var_count = 0
report.unresolved_state_count = 0
else:
report.unknown_var_count = len(reduced_problem_payload[0])
report.unresolved_state_count = int(np.count_nonzero(reduced_problem_payload[2]))
reduced_system = None
report.reduced_system_build_s = float(time.perf_counter() - phase_t0)
phase_t0 = time.perf_counter()
runtime_params = _build_runtime_param_array(problem)
report.runtime_param_build_s = float(time.perf_counter() - phase_t0)
phase_t0 = time.perf_counter()
constant_params = _build_constant_param_array(problem)
report.constant_param_build_s = float(time.perf_counter() - phase_t0)
if reduced_problem_payload is None:
reduced_cache_key = None
else:
reduced_cache_key = _build_persistent_initialization_cache_key(
problem=problem,
unknown_vars=reduced_problem_payload[0],
residual_eqs=reduced_problem_payload[1],
state_unknown_mask=reduced_problem_payload[2],
options=options,
)
phase_t0 = time.perf_counter()
cached_payload: Dict[str, object] | None = _load_persistent_initialization_solution(reduced_cache_key)
report.persistent_cache_load_s = float(time.perf_counter() - phase_t0)
if cached_payload is None:
reduced_system = ReducedInitializationSystem(
problem=problem,
unknown_vars=reduced_problem_payload[0],
residual_eqs=reduced_problem_payload[1],
state_unknown_mask=reduced_problem_payload[2],
)
report.reduced_system_build_s += 0.0
else:
if "init_guess_by_name" in cached_payload and "diff_init_guess_by_name" in cached_payload and _persistent_initialization_params_match(cached_payload, runtime_params, constant_params):
problem.init_guess = _restore_problem_guess_map(problem, dict(cached_payload["init_guess_by_name"]), False)
problem.diff_init_guess = dict()
_compute_missing_dx0(
problem,
report,
runtime_params=runtime_params,
constant_params=constant_params,
include_existing=True,
)
_restore_initialization_report_from_payload(report, dict(cached_payload["report"]))
report.persistent_cache_hit = True
report.persistent_cache_load_s = float(report.persistent_cache_load_s)
report.elapsed_s = float(time.perf_counter() - start_time)
return report
else:
reduced_system = ReducedInitializationSystem(
problem=problem,
unknown_vars=reduced_problem_payload[0],
residual_eqs=reduced_problem_payload[1],
state_unknown_mask=reduced_problem_payload[2],
)
except (FloatingPointError, ZeroDivisionError, ValueError, RuntimeError):
reduced_system = None
runtime_params = np.zeros(0, dtype=np.float64)
constant_params = np.zeros(0, dtype=np.float64)
reduced_cache_key = None
report.status = InitializationStatus.FAILED
report.method_used = EmtInitializationMethod.Explicit
report.message = "Reduced initialization system construction failed; keeping explicit initialization."
if reduced_system is None:
if report.status == InitializationStatus.FAILED:
x[:] = x_seed
else:
report.status = InitializationStatus.RESOLVED
report.method_used = EmtInitializationMethod.Explicit
else:
try:
phase_t0 = time.perf_counter()
residual0: np.ndarray = _evaluate_initialization_residual(reduced_system, problem, x, dx, runtime_params, constant_params)
report.initial_residual_eval_s = float(time.perf_counter() - phase_t0)
report.initial_residual_inf = _max_abs_value(residual0)
except (FloatingPointError, ZeroDivisionError, ValueError, RuntimeError):
x[:] = x_seed
report.status = InitializationStatus.FAILED
report.method_used = EmtInitializationMethod.Explicit
report.message = "Reduced initialization residual evaluation failed; keeping explicit initialization."
reduced_system = None
if reduced_system is None:
report.final_residual_inf = report.initial_residual_inf
else:
use_newton: bool = method_requested in {EmtInitializationMethod.Auto, EmtInitializationMethod.ConsistentNewton}
use_ptc: bool = method_requested in {EmtInitializationMethod.Auto, EmtInitializationMethod.PseudoTransient}
converged: bool = False
if use_newton:
phase_t0 = time.perf_counter()
converged = _run_reduced_newton_initialization(
reduced_system=reduced_system,
problem=problem,
x=x,
dx=dx,
runtime_params=runtime_params,
constant_params=constant_params,
options=options,
report=report,
)
report.newton_elapsed_s += float(time.perf_counter() - phase_t0)
if converged:
report.method_used = EmtInitializationMethod.ConsistentNewton
else:
converged = converged
else:
converged = False
if converged:
report.status = InitializationStatus.RESOLVED
elif use_ptc:
phase_t0 = time.perf_counter()
converged = _run_reduced_pseudo_transient_initialization(
reduced_system=reduced_system,
problem=problem,
x=x,
dx=dx,
runtime_params=runtime_params,
constant_params=constant_params,
options=options,
report=report,
)
# converged = _run_pseudo_transient_initialization(
# reduced_system=reduced_system,
# problem=problem,
# x=x,
# dx=dx,
# runtime_params=runtime_params,
# constant_params=constant_params,
# options=options,
# report=report,
# )
report.pseudo_transient_elapsed_s += float(time.perf_counter() - phase_t0)
if converged:
converged = converged
else:
# Pure PseudoTransient mode must remain pseudo-transient only.
# The broader initialization policy requested by the user is
# interpreted here, not in the reduced solver itself. When the
# method is explicitly PseudoTransient, failing the PTC loop must
# keep the explicit seed instead of retrying with Newton. Auto
# and ConsistentNewton continue to own the Newton-capable paths.
if method_requested == EmtInitializationMethod.PseudoTransient:
converged = False
else:
phase_t0 = time.perf_counter()
converged = _run_reduced_newton_initialization(
reduced_system=reduced_system,
problem=problem,
x=x,
dx=dx,
runtime_params=runtime_params,
constant_params=constant_params,
options=options,
report=report,
)
report.newton_elapsed_s += float(time.perf_counter() - phase_t0)
if converged:
report.status = InitializationStatus.RESOLVED
if method_requested == EmtInitializationMethod.PseudoTransient:
report.method_used = EmtInitializationMethod.PseudoTransient
elif method_requested == EmtInitializationMethod.Auto:
report.method_used = EmtInitializationMethod.PseudoTransient
else:
report.method_used = EmtInitializationMethod.ConsistentNewton
else:
x[:] = x_seed
report.status = InitializationStatus.FAILED
if method_requested == EmtInitializationMethod.PseudoTransient:
report.method_used = EmtInitializationMethod.PseudoTransient
report.message = "Pseudo-transient initialization failed."
else:
report.method_used = EmtInitializationMethod.Explicit
report.message = "Consistent initialization failed after pseudo-transient fallback; keeping explicit initialization."
else:
x[:] = x_seed
report.status = InitializationStatus.FAILED
report.method_used = EmtInitializationMethod.Explicit
report.message = "Consistent initialization failed and pseudo-transient fallback was disabled."
phase_t0 = time.perf_counter()
_apply_solution_to_problem(problem, x)
solution_apply_s: float = float(time.perf_counter() - phase_t0)
report.solution_apply_s = solution_apply_s
phase_t0 = time.perf_counter()
_compute_missing_dx0(
problem,
report,
x_full=x,
dx_full=dx,
runtime_params=runtime_params,
constant_params=constant_params,
include_existing=True,
)
report.dx0_completion_s = float(time.perf_counter() - phase_t0)
report.elapsed_s = float(time.perf_counter() - start_time)
report.dx0_completion_s += solution_apply_s
if report.status == InitializationStatus.RESOLVED and reduced_cache_key is not None:
phase_t0 = time.perf_counter()
_store_persistent_initialization_solution(
reduced_cache_key,
dict(
init_guess_by_name=_serialize_problem_guess_map(problem, dict(problem.init_guess), False),
diff_init_guess_by_name=_serialize_problem_guess_map(problem, dict(problem.diff_init_guess), True),
runtime_params_snapshot=runtime_params.tolist(),
constant_params_snapshot=constant_params.tolist(),
report=_serialize_initialization_report(report),
),
)
report.persistent_cache_store_s = float(time.perf_counter() - phase_t0)
else:
report.persistent_cache_store_s = 0.0
return report