Source code for VeraGridEngine.Simulations.EMT.solvers.structural_compiled_solver

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from __future__ import annotations

from enum import Enum
from typing import Any, Callable, Dict, List, Set, Tuple, Optional
import hashlib
import time

import numba as nb
from numba import types
import numpy as np
import numpy.typing as npt
from scipy.linalg import lu_factor, lu_solve
from scipy.sparse import csc_matrix
from scipy.sparse.linalg import SuperLU, splu

from VeraGridEngine.Utils.Symbolic.diagnostic import (
    NewtonDiagnosticsConfig,
    NewtonSolveContext,
    dense_lstsq_fallback,
    maybe_apply_backtracking,
    maybe_check_index1,
    sparse_lsqr_fallback,
    with_newton_diagnostics,
)
from VeraGridEngine.Utils.Symbolic.symbolic import BinOp, Const, Expr, Func, UnOp, Var
from VeraGridEngine.Utils.Symbolic.jit_compiler import EagerEquationCompiler, _compile_to_file
from VeraGridEngine.Utils.NumericalMethods.emt_sparse_superlu_backend import SuperLUSparseBackendProvider
from VeraGridEngine.Utils.NumericalMethods.external_sparse_solver_interface import (
    SparseLinearFactorizationHandle,
    SparseLinearSolverBackend,
    SparseLinearSolverBackendProvider,
)
from VeraGridEngine.Simulations.EMT.problems.emt_problem_template import (
    EmtBoundaryUpdateProtocol,
    EmtProblemTemplate,
    get_solver_forced_event_time,
    resolve_solver_boundary_updater,
)
from VeraGridEngine.enumerations import DynamicIntegrationMethod
from VeraGridEngine.basic_structures import Mat, Vec


Float64Vector = npt.NDArray[np.float64]
Float64Matrix = npt.NDArray[np.float64]
Int32Vector = npt.NDArray[np.int32]
Int32Matrix = npt.NDArray[np.int32]
DenseFactorization = Tuple[Any, Any]
DenseSolveBundle = Tuple[DenseFactorization, np.ndarray]
SparseSolveBundle = Tuple[SuperLU, csc_matrix]


[docs] class StructuralCompiledWarmupPolicy(Enum): """ Warmup policy used by the structural compiled backend. """ Off = "off" Adaptive = "adaptive" Full = "full"
[docs] class CompiledNumbaKernelCache: """ In-process cache of Numba-compiled eager kernels. """ __slots__ = ["_cache"] def __init__(self) -> None: """ Build the compiled-kernel cache. :return: None. :rtype: None """ self._cache: Dict[str, Callable[..., Any]] = dict()
[docs] def get(self, cache_key: str) -> Callable[..., Any] | None: """ Return one cached compiled kernel. :param cache_key: Deterministic cache key. :type cache_key: str :return: Cached compiled kernel or ``None``. :rtype: Callable[..., Any] | None """ return self._cache.get(cache_key, None)
[docs] def set(self, cache_key: str, kernel: Callable[..., Any]) -> None: """ Store one compiled kernel. :param cache_key: Deterministic cache key. :type cache_key: str :param kernel: Compiled kernel. :type kernel: Callable[..., Any] :return: None. :rtype: None """ self._cache[cache_key] = kernel
COMPILED_NUMBA_KERNEL_CACHE = CompiledNumbaKernelCache() def _safe_njit( py_func: Callable[..., Any], signature_tpe: Any, fastmath: bool = True, cache: bool = True, ) -> Callable[..., Any]: """ Compile a Python function with Numba using an eager signature when provided. :param py_func: Python function created from generated symbolic source. :type py_func: Callable[..., Any] :param signature_tpe: Eager Numba signature or ``None``. :type signature_tpe: Any :param fastmath: Whether Numba should enable fastmath rewrites. :type fastmath: bool :param cache: Whether Numba should persist the compiled machine code. :type cache: bool :return: Compiled Numba function. :rtype: Callable[..., Any] """ compiled_func: Callable[..., Any] signature_repr: str cache_payload: str cache_key: str cached_kernel: Callable[..., Any] | None if signature_tpe is None: signature_repr = "none" else: signature_repr = str(signature_tpe) cache_payload = "|".join([ py_func.__module__, py_func.__name__, signature_repr, str(fastmath), str(cache), ]) cache_key = hashlib.sha256(cache_payload.encode("utf-8")).hexdigest() cached_kernel = COMPILED_NUMBA_KERNEL_CACHE.get(cache_key) if cached_kernel is None: pass else: return cached_kernel if signature_tpe is None: compiled_func = nb.njit(fastmath=fastmath, cache=cache)(py_func) else: compiled_func = nb.njit(signature_tpe, fastmath=fastmath, cache=cache)(py_func) COMPILED_NUMBA_KERNEL_CACHE.set(cache_key, compiled_func) return compiled_func def _build_fused_residual_signature() -> Any: """ Return the eager signature of the fused residual dispatcher. :return: Eager Numba signature. :rtype: Any """ float64_vector_tpe: Any = types.Array(types.float64, 1, "C") float64_matrix_tpe: Any = types.Array(types.float64, 2, "C") return types.void( float64_vector_tpe, float64_vector_tpe, float64_vector_tpe, float64_vector_tpe, types.float64, float64_vector_tpe, float64_vector_tpe, float64_matrix_tpe, ) def _build_master_jacobian_signature() -> Any: """ Return the eager signature of the sparse Jacobian dispatcher. :return: Eager Numba signature. :rtype: Any """ float64_vector_tpe: Any = types.Array(types.float64, 1, "C") float64_matrix_tpe: Any = types.Array(types.float64, 2, "C") int32_vector_tpe: Any = types.Array(types.int32, 1, "C") return types.void( float64_vector_tpe, float64_matrix_tpe, float64_vector_tpe, float64_vector_tpe, float64_vector_tpe, types.float64, float64_vector_tpe, float64_vector_tpe, int32_vector_tpe, int32_vector_tpe, int32_vector_tpe, int32_vector_tpe, float64_matrix_tpe, ) def _build_scatter_color_signature() -> Any: """ Return the eager signature of the CSC scatter helper. :return: Eager helper signature. :rtype: Any """ float64_vector_tpe: Any = types.Array(types.float64, 1, "C") int32_vector_tpe: Any = types.Array(types.int32, 1, "C") return types.void( float64_vector_tpe, float64_vector_tpe, int32_vector_tpe, int32_vector_tpe, int32_vector_tpe, int32_vector_tpe, types.int64, ) def _build_max_abs_signature() -> Any: """ Return the eager signature of the infinity-norm helper. :return: Eager helper signature. :rtype: Any """ float64_vector_tpe: Any = types.Array(types.float64, 1, "C") return types.float64(float64_vector_tpe) def _build_copy_negated_signature() -> Any: """ Return the eager signature of the negated-copy helper. :return: Eager helper signature. :rtype: Any """ float64_vector_tpe: Any = types.Array(types.float64, 1, "C") return types.void(float64_vector_tpe, float64_vector_tpe) def _build_fill_full_parameter_signature() -> Any: """ Return the eager signature of the full-parameter fill helper. :return: Eager helper signature. :rtype: Any """ float64_vector_tpe: Any = types.Array(types.float64, 1, "C") return types.void(float64_vector_tpe, float64_vector_tpe, float64_vector_tpe) def _dense_lu_bundle_solve(bundle: DenseSolveBundle, rhs: Vec) -> Vec: """ Solve one dense LU linear system. :param bundle: Dense LU bundle ``(factorization, matrix)``. :param rhs: Right-hand side vector. :return: Solution vector. """ return lu_solve(bundle[0], rhs) def _dense_lu_bundle_fallback(bundle: DenseSolveBundle, rhs: Vec) -> Vec: """ Solve one dense fallback least-squares system. :param bundle: Dense LU bundle ``(factorization, matrix)``. :param rhs: Right-hand side vector. :return: Fallback solution vector. """ return dense_lstsq_fallback(bundle[1], rhs) def _dense_lu_bundle_matrix(bundle: DenseSolveBundle) -> np.ndarray: """ Return the dense matrix stored in one LU bundle. :param bundle: Dense LU bundle ``(factorization, matrix)``. :return: Dense Jacobian matrix. """ return bundle[1] def _sparse_factor_manager_solve(factor_manager: "StructuralCompiledSparseFactorizationManager", rhs: Vec) -> Vec: """ Solve one sparse linear system through the compiled sparse factor manager. :param factor_manager: Sparse factorization manager. :param rhs: Right-hand side vector. :return: Solution vector. """ return factor_manager.solve(rhs) def _sparse_factor_manager_fallback(factor_manager: "StructuralCompiledSparseFactorizationManager", rhs: Vec) -> Vec: """ Solve one sparse fallback least-squares system. :param factor_manager: Sparse factorization manager. :param rhs: Right-hand side vector. :return: Fallback solution vector. """ return factor_manager.solve_fallback(rhs) def _sparse_factor_manager_matrix(factor_manager: "StructuralCompiledSparseFactorizationManager") -> csc_matrix: """ Return the active sparse Jacobian matrix held by the factor manager. :param factor_manager: Sparse factorization manager. :return: Active sparse Jacobian matrix. """ return factor_manager.get_active_matrix() def _build_permute_csc_signature() -> Any: """ Return the eager signature of the CSC permutation helper. :return: Eager helper signature. :rtype: Any """ float64_vector_tpe: Any = types.Array(types.float64, 1, "C") int32_vector_tpe: Any = types.Array(types.int32, 1, "C") return types.void(float64_vector_tpe, int32_vector_tpe, int32_vector_tpe, int32_vector_tpe, float64_vector_tpe) def _build_scatter_permuted_solution_signature() -> Any: """ Return the eager signature of the solution scatter helper. :return: Eager helper signature. :rtype: Any """ float64_vector_tpe: Any = types.Array(types.float64, 1, "C") int32_vector_tpe: Any = types.Array(types.int32, 1, "C") return types.void(float64_vector_tpe, int32_vector_tpe, float64_vector_tpe) def _get_runtime_uid(var_obj: Var) -> int: """ Return the canonical runtime UID of a symbolic variable. :param var_obj: Symbolic variable or differential placeholder. :type var_obj: Var :return: Canonical runtime UID. :rtype: int """ runtime_uid: int if var_obj.base_var is None: runtime_uid = var_obj.uid else: runtime_uid = var_obj.base_var.uid return runtime_uid def _canonicalize_symbolic_node( node: Expr, runtime_uids: Set[int], parameter_uids: Set[int], runtime_slots: Dict[int, str], parameter_slots: Dict[int, str], ordered_runtime_vars: List[Var], visited_runtime_uids: Set[int], ) -> str: """ Convert a symbolic expression into a structural canonical string. :param node: Symbolic node to canonicalize. :type node: Expr :param runtime_uids: UIDs that belong to the state or algebraic runtime vector. :type runtime_uids: Set[int] :param parameter_uids: UIDs that belong to the parameter vector. :type parameter_uids: Set[int] :param runtime_slots: Runtime placeholder mapping. :type runtime_slots: Dict[int, str] :param parameter_slots: Parameter placeholder mapping. :type parameter_slots: Dict[int, str] :param ordered_runtime_vars: Ordered runtime variables found in the expression. :type ordered_runtime_vars: List[Var] :param visited_runtime_nodes: Visited node identifiers for stable collection. :type visited_runtime_nodes: Set[int] :return: Canonical structural representation. :rtype: str """ if isinstance(node, Var): runtime_uid: int = _get_runtime_uid(node) if runtime_uid in runtime_uids: if runtime_uid in runtime_slots: pass else: runtime_slots[runtime_uid] = f"__RVAR_{len(runtime_slots)}__" if runtime_uid in visited_runtime_uids: pass else: visited_runtime_uids.add(runtime_uid) ordered_runtime_vars.append(node) return runtime_slots[runtime_uid] elif runtime_uid in parameter_uids: if runtime_uid in parameter_slots: pass else: parameter_name: str = node.name parameter_slots[runtime_uid] = f"__PARAM_{parameter_name}_{runtime_uid}__" return parameter_slots[runtime_uid] else: raise RuntimeError(f"Unclassified symbolic variable uid={runtime_uid}") elif isinstance(node, Const): return f"{float(node.value):.16g}" elif isinstance(node, BinOp): left_str: str = _canonicalize_symbolic_node( node.left, runtime_uids, parameter_uids, runtime_slots, parameter_slots, ordered_runtime_vars, visited_runtime_uids, ) right_str: str = _canonicalize_symbolic_node( node.right, runtime_uids, parameter_uids, runtime_slots, parameter_slots, ordered_runtime_vars, visited_runtime_uids, ) if node.op in ['+', '*'] and left_str > right_str: left_str, right_str = right_str, left_str return f"({left_str}{node.op}{right_str})" elif isinstance(node, UnOp): operand_str: str = _canonicalize_symbolic_node( node.operand, runtime_uids, parameter_uids, runtime_slots, parameter_slots, ordered_runtime_vars, visited_runtime_uids, ) return f"{node.op}({operand_str})" elif isinstance(node, Func): argument_str: str = _canonicalize_symbolic_node( node.arg, runtime_uids, parameter_uids, runtime_slots, parameter_slots, ordered_runtime_vars, visited_runtime_uids, ) return f"{node.op}({argument_str})" else: return str(node)
[docs] def canonicalize_expression( expr: Expr, runtime_uids: Set[int], parameter_uids: Set[int], ) -> Tuple[str, List[Var]]: """ Return the canonical signature and ordered runtime variables of an expression. :param expr: Symbolic expression to analyze. :type expr: Expr :param runtime_uids: UIDs that belong to the runtime vector. :type runtime_uids: Set[int] :param parameter_uids: UIDs that belong to the parameter vector. :type parameter_uids: Set[int] :return: Canonical structural string and ordered runtime variables. :rtype: Tuple[str, List[Var]] """ runtime_slots: Dict[int, str] = dict() parameter_slots: Dict[int, str] = dict() ordered_runtime_vars: List[Var] = list() visited_runtime_uids: Set[int] = set() signature_str: str = _canonicalize_symbolic_node( expr, runtime_uids, parameter_uids, runtime_slots, parameter_slots, ordered_runtime_vars, visited_runtime_uids, ) return signature_str, ordered_runtime_vars
def _build_full_residual_equations( state_vars: List[Var], state_eqs: List[Expr], algebraic_eqs: List[Expr], ) -> List[Expr]: """ Build the full implicit residual system used by Newton iterations. :param state_vars: Ordered state variables. :type state_vars: List[Var] :param state_eqs: Differential right-hand sides. :type state_eqs: List[Expr] :param algebraic_eqs: Algebraic constraints. :type algebraic_eqs: List[Expr] :return: Full residual system in solver order. :rtype: List[Expr] """ n_state: int = len(state_eqs) n_alg: int = len(algebraic_eqs) full_eqs: List[Expr] = [Const(0.0) for _ in range(n_state + n_alg)] i: int = 0 # The differential block is converted into d_x - f(x, y, p) = 0. while i < n_state: state_var: Var = state_vars[i] differential_term: Var = Var(name=f"d_{state_var.name}", base_var=state_var) full_eqs[i] = differential_term - state_eqs[i] i += 1 # The algebraic block is appended directly after the state residuals. i = 0 while i < n_alg: full_eqs[n_state + i] = algebraic_eqs[i] i += 1 return full_eqs def _build_structural_residual_debug_info(state_vars: List[Var], algebraic_vars: List[Var], state_eqs: List[Expr], algebraic_eqs: List[Expr], uid2idx_vars: Dict[int, int]) -> List[Dict[str, Any]]: """ Build metadata aligned with the structural residual ordering. :param state_vars: Ordered state variables. :type state_vars: List[Var] :param algebraic_vars: Ordered algebraic variables. :type algebraic_vars: List[Var] :param state_eqs: Differential right-hand sides. :type state_eqs: List[Expr] :param algebraic_eqs: Algebraic constraints. :type algebraic_eqs: List[Expr] :param uid2idx_vars: Runtime uid-to-index lookup. :type uid2idx_vars: Dict[int, int] :return: Residual metadata list. :rtype: List[Dict[str, Any]] """ debug_info: List[Dict[str, Any]] = list() state_index: int = 0 algebraic_index: int = 0 while state_index < len(state_eqs): state_var: Var = state_vars[state_index] debug_info.append({ "kind": "STATE", "label": f"d_{state_var.name} - rhs_{state_var.name}", "var_name": state_var.name, "state_idx": uid2idx_vars.get(state_var.uid, None), }) state_index += 1 while algebraic_index < len(algebraic_eqs): if algebraic_index < len(algebraic_vars): algebraic_var: Var = algebraic_vars[algebraic_index] debug_info.append({ "kind": "ALG", "label": f"alg for {algebraic_var.name}", "var_name": algebraic_var.name, "state_idx": uid2idx_vars.get(algebraic_var.uid, None), }) else: debug_info.append({ "kind": "ALG", "label": f"ALG_EQ_{algebraic_index}", "var_name": f"<no algebraic var for eq {algebraic_index}>", "state_idx": None, }) algebraic_index += 1 return debug_info def _format_structural_residual_zero_division_message(failed_index: int | None, debug_info: List[Dict[str, Any]] | None) -> str: """ Format one divide-by-zero diagnostic for the structural residual path. :param failed_index: Residual index when known. :type failed_index: int | None :param debug_info: Residual metadata. :type debug_info: List[Dict[str, Any]] | None :return: Diagnostic message. :rtype: str """ if failed_index is None: return "StructuralCompiled residual evaluation failed with division by zero; failing residual index could not be resolved." else: pass if debug_info is None: return f"StructuralCompiled residual evaluation failed with division by zero at residual index {failed_index}." else: pass if failed_index < 0 or failed_index >= len(debug_info): return f"StructuralCompiled residual evaluation failed with division by zero at residual index {failed_index}." else: pass info: Dict[str, Any] = debug_info[failed_index] kind: Any = info.get("kind", "UNK") label: Any = info.get("label", f"residual_{failed_index}") var_name: Any = info.get("var_name", "<?>") return ( f"StructuralCompiled residual evaluation failed with division by zero at residual index {failed_index} " f"[{kind}] for '{var_name}' ({label})." ) def _locate_failing_residual_with_single_equation_kernels(equations: List[Expr], variables: List[Var], parameters: List[Var], method: DynamicIntegrationMethod, states: Vec, params: Vec, history: Vec, d_history: Vec, h: float, history2: Vec) -> int | None: """ Locate the first residual index that fails under one-equation Python kernels. :param equations: Full residual equation list. :type equations: List[Expr] :param variables: Ordered runtime variables. :type variables: List[Var] :param parameters: Ordered parameter list. :type parameters: List[Var] :param method: Implicit integration method. :type method: DynamicIntegrationMethod :param states: Current Newton iterate. :type states: Vec :param params: Full parameter vector. :type params: Vec :param history: Previous state vector. :type history: Vec :param d_history: Previous derivative vector. :type d_history: Vec :param h: Local time step. :type h: float :param history2: Secondary history vector. :type history2: Vec :return: Failing residual index or ``None``. :rtype: int | None """ compiler: EagerEquationCompiler = EagerEquationCompiler(variables=variables, parameters=parameters, method=method) equation_index: int = 0 while equation_index < len(equations): equation_list: List[Expr] = list([equations[equation_index]]) out_vec: Vec = np.zeros(1, dtype=np.float64) py_kernel: Callable[..., Any] signature_tpe: object py_kernel, signature_tpe = compiler.compile( equations=equation_list, func_name=f"structural_single_eq_debug_{equation_index}", use_cse=True, offset=0, inplace=True, ) _unused_signature_tpe: object = signature_tpe try: py_kernel(states, params, history, d_history, h, out_vec, history2) equation_index += 1 except ZeroDivisionError: return equation_index return None def _build_parameter_uid_set( variable_parameters: List[Var], constant_parameters: List[Var], ) -> Set[int]: """ Build the parameter UID set used by structural canonicalization. :param variable_parameters: Runtime parameters. :type variable_parameters: List[Var] :param constant_parameters: Constant parameters. :type constant_parameters: List[Var] :return: UID set of all parameters visible to the solver. :rtype: Set[int] """ parameter_uids: Set[int] = set() parameter: Var for parameter in variable_parameters: parameter_uids.add(parameter.uid) for parameter in constant_parameters: parameter_uids.add(parameter.uid) return parameter_uids def _build_backend_cache_token( method: DynamicIntegrationMethod, n_rows: int, n_cols: int, group_keys: List[str], ) -> str: """ Return a deterministic cache token for backend-generated dispatchers. :param method: Integration method used by the backend. :type method: DynamicIntegrationMethod :param n_rows: Number of residual equations. :type n_rows: int :param n_cols: Number of runtime variables. :type n_cols: int :param group_keys: Structural group signatures. :type group_keys: List[str] :return: Stable cache token. :rtype: str """ payload: str = "|".join([method.name, str(n_rows), str(n_cols), str(len(group_keys))]) return hashlib.sha256(payload.encode("utf-8")).hexdigest()[:16] def _greedy_color_columns(col_rows: List[List[int]], n_rows: int) -> Tuple[Int32Vector, int, List[List[int]]]: """ Color the sparse column dependency graph with a greedy heuristic. :param col_rows: Row indices present in each sparse column. :type col_rows: List[List[int]] :param n_rows: Total number of rows in the Jacobian. :type n_rows: int :return: Colors per column, number of colors and explicit color groups. :rtype: Tuple[Int32Vector, int, List[List[int]]] """ n_cols: int = len(col_rows) row_cols: List[List[int]] = list() adjacency: List[Set[int]] = list() color_groups: List[List[int]] = list() degrees: Int32Vector order: Int32Vector colors: Int32Vector used: npt.NDArray[np.bool_] row_index: int col_index: int row_index = 0 while row_index < n_rows: row_cols.append(list()) row_index += 1 col_index = 0 while col_index < n_cols: row_list: List[int] = col_rows[col_index] row_position: int = 0 while row_position < len(row_list): row_cols[row_list[row_position]].append(col_index) row_position += 1 col_index += 1 col_index = 0 while col_index < n_cols: adjacency.append(set()) col_index += 1 row_index = 0 while row_index < n_rows: cols_in_row: List[int] = row_cols[row_index] left_pos: int = 0 while left_pos < len(cols_in_row): right_pos: int = left_pos + 1 while right_pos < len(cols_in_row): left_col: int = cols_in_row[left_pos] right_col: int = cols_in_row[right_pos] if left_col == right_col: pass else: adjacency[left_col].add(right_col) adjacency[right_col].add(left_col) right_pos += 1 left_pos += 1 row_index += 1 degrees = np.zeros(n_cols, dtype=np.int32) col_index = 0 while col_index < n_cols: degrees[col_index] = len(adjacency[col_index]) col_index += 1 order = np.argsort(-degrees).astype(np.int32) colors = -np.ones(n_cols, dtype=np.int32) used = np.zeros(n_cols, dtype=np.bool_) max_color: int = -1 col_index = 0 while col_index < len(order): candidate_col: int = int(order[col_index]) used[:] = False neighbor_col: int for neighbor_col in adjacency[candidate_col]: neighbor_color: int = int(colors[neighbor_col]) if neighbor_color >= 0: used[neighbor_color] = True else: pass color_value: int = 0 while color_value < n_cols and used[color_value]: color_value += 1 colors[candidate_col] = color_value if color_value > max_color: max_color = color_value else: pass col_index += 1 color_index: int = 0 while color_index < max_color + 1: color_groups.append(list()) color_index += 1 col_index = 0 while col_index < n_cols: color_groups[int(colors[col_index])].append(col_index) col_index += 1 return colors, int(max_color + 1), color_groups @nb.njit(_build_scatter_color_signature(), cache=True, fastmath=True) def _scatter_color_jvp_to_csc_data( jvp: Float64Vector, data: Float64Vector, color_ptr: Int32Vector, col_ptr: Int32Vector, row_idx: Int32Vector, data_idx: Int32Vector, color_id: int, ) -> None: """ Scatter a colored JVP vector into CSC numeric storage. :param jvp: JVP vector for the active color. :type jvp: Float64Vector :param data: Target CSC numeric values. :type data: Float64Vector :param color_ptr: Pointer to the column-color groups. :type color_ptr: Int32Vector :param col_ptr: Pointer to each grouped column map. :type col_ptr: Int32Vector :param row_idx: Row indices in CSC order. :type row_idx: Int32Vector :param data_idx: Direct positions inside ``data``. :type data_idx: Int32Vector :param color_id: Active color identifier. :type color_id: int :return: None :rtype: None """ k0: int = int(color_ptr[color_id]) k1: int = int(color_ptr[color_id + 1]) k: int = k0 while k < k1: p0: int = int(col_ptr[k]) p1: int = int(col_ptr[k + 1]) p: int = p0 while p < p1: data[data_idx[p]] = jvp[row_idx[p]] p += 1 k += 1 @nb.njit(_build_max_abs_signature(), cache=True, fastmath=True) def _max_abs_value(values: Float64Vector) -> float: """ Compute the infinity norm without allocating temporary arrays. :param values: Vector to inspect. :type values: Float64Vector :return: Maximum absolute value. :rtype: float """ max_value: float = 0.0 index: int = 0 while index < values.shape[0]: raw_value: float = float(values[index]) abs_value: float if raw_value < 0.0: abs_value = -raw_value else: abs_value = raw_value if abs_value > max_value: max_value = abs_value else: pass index += 1 return max_value @nb.njit(_build_copy_negated_signature(), cache=True, fastmath=True) def _copy_negated_vector(values: Float64Vector, out: Float64Vector) -> None: """ Write the negated vector into a preallocated buffer. :param values: Source vector. :type values: Float64Vector :param out: Destination vector. :type out: Float64Vector :return: None :rtype: None """ index: int = 0 while index < values.shape[0]: out[index] = -values[index] index += 1 @nb.njit(_build_fill_full_parameter_signature(), cache=True, fastmath=True) def _fill_full_parameter_buffer( runtime_params: Float64Vector, static_params: Float64Vector, full_params_out: Float64Vector, ) -> None: """ Write runtime and static parameters into one preallocated full buffer. :param runtime_params: Runtime parameter slice. :type runtime_params: Float64Vector :param static_params: Static parameter slice. :type static_params: Float64Vector :param full_params_out: Destination full parameter vector. :type full_params_out: Float64Vector :return: None :rtype: None """ runtime_count: int = int(runtime_params.shape[0]) static_count: int = int(static_params.shape[0]) index: int = 0 while index < runtime_count: full_params_out[index] = runtime_params[index] index += 1 index = 0 while index < static_count: full_params_out[runtime_count + index] = static_params[index] index += 1 @nb.njit(cache=True, fastmath=True) def _max_abs_diff_vectors(left_values: Float64Vector, right_values: Float64Vector) -> float: """ Return the maximum absolute difference between two vectors. :param left_values: Left vector. :type left_values: Float64Vector :param right_values: Right vector. :type right_values: Float64Vector :return: Maximum absolute difference. :rtype: float """ max_value: float = 0.0 index: int = 0 while index < left_values.shape[0]: diff_value: float = left_values[index] - right_values[index] if diff_value < 0.0: diff_value = -diff_value else: diff_value = diff_value if diff_value > max_value: max_value = diff_value else: max_value = max_value index += 1 return max_value @nb.njit(_build_permute_csc_signature(), cache=True, fastmath=True) def _permute_csc_data_by_columns( source_data: Float64Vector, source_indptr: Int32Vector, column_perm: Int32Vector, target_indptr: Int32Vector, target_data: Float64Vector, ) -> None: """ Reorder CSC numeric values according to a persistent column permutation. :param source_data: Numeric values in the original column order. :type source_data: Float64Vector :param source_indptr: Original CSC column pointer. :type source_indptr: Int32Vector :param column_perm: Persistent destination-to-source column permutation. :type column_perm: Int32Vector :param target_indptr: Target CSC column pointer. :type target_indptr: Int32Vector :param target_data: Destination numeric buffer. :type target_data: Float64Vector :return: None :rtype: None """ dst_col: int = 0 while dst_col < column_perm.shape[0]: src_col: int = int(column_perm[dst_col]) src_ptr_0: int = int(source_indptr[src_col]) src_ptr_1: int = int(source_indptr[src_col + 1]) dst_ptr_0: int = int(target_indptr[dst_col]) local_index: int = 0 while src_ptr_0 + local_index < src_ptr_1: target_data[dst_ptr_0 + local_index] = source_data[src_ptr_0 + local_index] local_index += 1 dst_col += 1 @nb.njit(_build_scatter_permuted_solution_signature(), cache=True, fastmath=True) def _scatter_permuted_solution_to_original_order( permuted_solution: Float64Vector, inverse_column_perm: Int32Vector, out_solution: Float64Vector, ) -> None: """ Scatter a solution computed on a column-permuted system back to original order. :param permuted_solution: Solution in the permuted variable order. :type permuted_solution: Float64Vector :param inverse_column_perm: Mapping from original index to permuted index. :type inverse_column_perm: Int32Vector :param out_solution: Destination vector in original solver order. :type out_solution: Float64Vector :return: None :rtype: None """ var_index: int = 0 while var_index < out_solution.shape[0]: out_solution[var_index] = permuted_solution[int(inverse_column_perm[var_index])] var_index += 1 def _warmup_compiled_numba_helpers() -> None: """ Trigger one eager call to the small Numba helper kernels. :return: None :rtype: None """ sample_vector: Vec = np.zeros(2, dtype=np.float64) sample_out_vector: Vec = np.zeros(2, dtype=np.float64) sample_runtime_params: Vec = np.zeros(1, dtype=np.float64) sample_static_params: Vec = np.zeros(1, dtype=np.float64) sample_full_params: Vec = np.zeros(2, dtype=np.float64) sample_source_data: Vec = np.zeros(2, dtype=np.float64) sample_target_data: Vec = np.zeros(2, dtype=np.float64) sample_indptr: Int32Vector = np.array([0, 1, 2], dtype=np.int32) sample_perm: Int32Vector = np.array([0, 1], dtype=np.int32) sample_inverse_perm: Int32Vector = np.array([0, 1], dtype=np.int32) sample_color_ptr: Int32Vector = np.array([0, 2], dtype=np.int32) sample_col_ptr: Int32Vector = np.array([0, 1, 2], dtype=np.int32) sample_row_idx: Int32Vector = np.array([0, 1], dtype=np.int32) sample_data_idx: Int32Vector = np.array([0, 1], dtype=np.int32) _max_abs_value(sample_vector) _copy_negated_vector(sample_vector, sample_out_vector) _fill_full_parameter_buffer(sample_runtime_params, sample_static_params, sample_full_params) _permute_csc_data_by_columns(sample_source_data, sample_indptr, sample_perm, sample_indptr, sample_target_data) _scatter_permuted_solution_to_original_order(sample_vector, sample_inverse_perm, sample_out_vector) _scatter_color_jvp_to_csc_data( sample_vector, sample_target_data, sample_color_ptr, sample_col_ptr, sample_row_idx, sample_data_idx, 0, ) def _should_run_full_backend_warmup( warmup_policy: StructuralCompiledWarmupPolicy, n_vars: int, estimated_steps: int, use_direct_residual: bool, ) -> bool: """ Decide whether the backend should perform a full residual/Jacobian warmup. :param warmup_policy: Configured warmup policy. :type warmup_policy: StructuralCompiledWarmupPolicy :param n_vars: Number of runtime variables. :type n_vars: int :param estimated_steps: Estimated macro time steps. :type estimated_steps: int :param use_direct_residual: Whether the backend uses the direct residual path. :type use_direct_residual: bool :return: ``True`` when a full warmup should run. :rtype: bool """ if warmup_policy == StructuralCompiledWarmupPolicy.Full: return True elif warmup_policy == StructuralCompiledWarmupPolicy.Off: return False else: pass # Adaptive mode only pays the full warmup when the run is large enough for # the saved runtime to amortize the up-front cost. The direct residual path # especially benefits on long trajectories because it is called every Newton step. if estimated_steps >= 1000: return True elif estimated_steps >= 200 and n_vars >= 48: return True elif use_direct_residual and estimated_steps >= 100: return True else: return False
[docs] class StructuralCompiledBoundaryUpdateWrapper: """ Boundary update interface for the structural compiled solver. The structural compiled solver keeps the linear algebra outside Numba, but it still needs a typed boundary update object to refresh runtime parameters between local substeps. """ __slots__ = []
[docs] def update(self, t: float, x: Vec, params: Vec) -> None: """ Update the parameter vector in place. :param t: Current simulation time. :type t: float :param x: Current Newton iterate. :type x: Vec :param params: Full parameter vector. :type params: Vec :return: None :rtype: None """ _unused: Tuple[float, Vec, Vec] = (t, x, params) return None
[docs] def get_next_forced_event_time(self, t_prev: float, t_target: float) -> float | None: """ Return the next forced event time inside the local substep. :param t_prev: Local substep start. :type t_prev: float :param t_target: Local substep nominal end. :type t_target: float :return: Next forced event time or ``None``. :rtype: float | None """ _unused: Tuple[float, float] = (t_prev, t_target) return None
[docs] class StructuralCompiledPredictor: """ Explicit predictor used to initialize the Newton iteration. """ __slots__ = ["_n_states"] def __init__(self, n_states: int) -> None: """ Create the predictor. :param n_states: Number of differential states. :type n_states: int :return: None :rtype: None """ self._n_states = n_states
[docs] def predict( self, x_iter: Vec, x_prev: Vec, dx_prev: Vec, h: float, pred_method: DynamicIntegrationMethod | None, ) -> Vec: """ Write the predictor guess in place. :param x_iter: Destination vector for the predictor guess. :type x_iter: Vec :param x_prev: Previous variable vector. :type x_prev: Vec :param dx_prev: Previous differential vector. :type dx_prev: Vec :param h: Local time step. :type h: float :param pred_method: Predictor integration method. :type pred_method: DynamicIntegrationMethod | None :return: Updated predictor vector. :rtype: Vec """ # The default predictor is a no-op copy because it is always safe. x_iter[:] = x_prev # Explicit Euler is only applied to the differential subset. if pred_method == DynamicIntegrationMethod.OdeEuler: x_iter[:self._n_states] = x_prev[:self._n_states] + h * dx_prev[:self._n_states] else: pass return x_iter
[docs] class StructuralCompiledResidualTrialEvaluator: """ Callable helper that evaluates residuals during Newton backtracking. """ __slots__ = ["_residual_assembler", "_full_params", "_x_prev", "_dx_prev", "_h_eff", "_x_prev2"] def __init__(self) -> None: """ Build one empty trial-residual evaluator. :return: None :rtype: None """ self._residual_assembler: StructuralCompiledResidualAssembler | StructuralCompiledDirectResidualAssembler | None = None self._full_params: Vec | None = None self._x_prev: Vec | None = None self._dx_prev: Vec | None = None self._h_eff: float = 0.0 self._x_prev2: Vec | None = None
[docs] def set_context( self, residual_assembler: StructuralCompiledResidualAssembler | StructuralCompiledDirectResidualAssembler, full_params: Vec, x_prev: Vec, dx_prev: Vec, h_eff: float, x_prev2: Vec, ) -> None: """ Store the residual-evaluation context of the current Newton step. :param residual_assembler: Residual assembler. :type residual_assembler: StructuralCompiledResidualAssembler | StructuralCompiledDirectResidualAssembler :param full_params: Full parameter vector. :type full_params: Vec :param x_prev: Previous accepted state. :type x_prev: Vec :param dx_prev: Previous derivative vector. :type dx_prev: Vec :param h_eff: Effective local time step. :type h_eff: float :param x_prev2: Secondary state history. :type x_prev2: Vec :return: None :rtype: None """ self._residual_assembler = residual_assembler self._full_params = full_params self._x_prev = x_prev self._dx_prev = dx_prev self._h_eff = float(h_eff) self._x_prev2 = x_prev2
[docs] def evaluate(self, trial_x: Vec, out_res: Vec) -> float: """ Evaluate one trial Newton iterate during backtracking. :param trial_x: Trial iterate. :type trial_x: Vec :param out_res: Destination residual buffer. :type out_res: Vec :return: Residual infinity norm. :rtype: float """ if self._residual_assembler is None: return np.inf else: pass if self._full_params is None or self._x_prev is None or self._dx_prev is None or self._x_prev2 is None: return np.inf else: pass self._residual_assembler.evaluate( trial_x, self._full_params, self._x_prev, self._dx_prev, self._h_eff, self._x_prev2, out_res, ) return _max_abs_value(out_res)
[docs] class StructuralCompiledEquationGroup: """ Structural group of equations that share the same vectorized template. """ __slots__ = ["_index_matrix", "_row_indices", "_template_eq", "_template_vars"] def __init__(self) -> None: """ Create an empty structural group. :return: None :rtype: None """ self._index_matrix: List[List[int]] = list() self._row_indices: List[int] = list() self._template_eq: Expr | None = None self._template_vars: List[Var] | None = None
[docs] def set_template(self, template_eq: Expr, template_vars: List[Var]) -> None: """ Store the canonical template of the group. :param template_eq: Canonical structural equation. :type template_eq: Expr :param template_vars: Ordered runtime variables of the template. :type template_vars: List[Var] :return: None :rtype: None """ self._template_eq = template_eq self._template_vars = list(template_vars)
[docs] def add_indices(self, variable_indices: List[int], row_index: int) -> None: """ Append one grouped equation instance. :param variable_indices: Runtime indices used by this equation instance. :type variable_indices: List[int] :param row_index: Global residual row index. :type row_index: int :return: None :rtype: None """ self._index_matrix.append(list(variable_indices)) self._row_indices.append(row_index)
[docs] def get_index_matrix(self) -> List[List[int]]: """ Return the grouped runtime index matrix. :return: Grouped runtime index matrix. :rtype: List[List[int]] """ return self._index_matrix
[docs] def get_row_indices(self) -> List[int]: """ Return the residual rows covered by the group. :return: Residual row indices. :rtype: List[int] """ return self._row_indices
[docs] def get_template_eq(self) -> Expr | None: """ Return the template equation. :return: Template equation or ``None``. :rtype: Expr | None """ return self._template_eq
[docs] def get_template_vars(self) -> List[Var] | None: """ Return the template runtime variables. :return: Ordered template runtime variables or ``None``. :rtype: List[Var] | None """ return self._template_vars
[docs] class StructuralCompiledVectorKernel: """ Typed container for one vectorized residual kernel. """ __slots__ = ["_kernel", "_indices", "_target_rows", "_row_count"] def __init__(self, kernel: Callable[..., Any], indices: Int32Matrix, target_rows: Int32Vector) -> None: """ Create the vectorized kernel container. :param kernel: Compiled eager kernel. :type kernel: Callable[..., Any] :param indices: Runtime gather matrix. :type indices: Int32Matrix :param target_rows: Residual rows written by the kernel. :type target_rows: Int32Vector :return: None :rtype: None """ self._kernel = kernel self._indices = indices self._target_rows = target_rows self._row_count = int(target_rows.shape[0])
[docs] def get_kernel(self) -> Callable[..., Any]: """ Return the compiled kernel. :return: Compiled kernel. :rtype: Callable[..., Any] """ return self._kernel
[docs] def get_indices(self) -> Int32Matrix: """ Return the runtime gather matrix. :return: Runtime gather matrix. :rtype: Int32Matrix """ return self._indices
[docs] def get_target_rows(self) -> Int32Vector: """ Return the residual target rows. :return: Residual target rows. :rtype: Int32Vector """ return self._target_rows
[docs] def get_row_count(self) -> int: """ Return the number of rows emitted by the kernel. :return: Number of grouped rows. :rtype: int """ return self._row_count
[docs] class StructuralCompiledResidualDispatcher: """ Residual dispatcher that orchestrates compiled group kernels explicitly. The structural group kernels are already eagerly compiled with Numba. Keeping the final scatter loop in Python avoids fragile JIT/global binding behavior in the dispatcher layer and mirrors the residual assembly strategy used by the vectorized solver. """ __slots__ = ["_kernel_specs"] def __init__(self, kernel_specs: List[StructuralCompiledVectorKernel]) -> None: self._kernel_specs = kernel_specs def __call__( self, states: Vec, params: Vec, history: Vec, d_history: Vec, h: float, history2: Vec, data_out: Vec, work_buffers: Float64Matrix, ) -> None: data_out[:] = 0.0 kernel_index: int = 0 while kernel_index < len(self._kernel_specs): kernel_spec: StructuralCompiledVectorKernel = self._kernel_specs[kernel_index] row_count: int = kernel_spec.get_row_count() local_out: Float64Vector = work_buffers[kernel_index, :row_count] kernel_spec.get_kernel()( states, params, history, d_history, h, kernel_spec.get_indices(), local_out, history2, ) data_out[kernel_spec.get_target_rows()] = local_out[:row_count] kernel_index += 1
def _compile_master_jacobian_kernel( ad_kernel: Callable[..., Any], n_colors: int, cache_token: str, ) -> Callable[..., Any]: """ Build the eager sparse Jacobian dispatcher. :param ad_kernel: Generic AD kernel reused across colors. :type ad_kernel: Callable[..., Any] :param n_colors: Number of graph colors. :type n_colors: int :return: Compiled master Jacobian dispatcher. :rtype: Callable[..., Any] """ _unused = cache_token return StructuralCompiledMasterJacobianDispatcher(ad_kernel=ad_kernel, n_colors=n_colors)
[docs] class StructuralCompiledMasterJacobianDispatcher: """ Callable dispatcher that evaluates structural AD colors into CSC storage. """ __slots__ = ("_ad_kernel", "_n_colors") def __init__(self, ad_kernel: Callable[..., Any], n_colors: int) -> None: """ Store the generic AD kernel and the number of graph colors. :param ad_kernel: Generic AD kernel reused across colors. :param n_colors: Number of graph colors. :return: None. """ self._ad_kernel: Callable[..., Any] = ad_kernel self._n_colors: int = n_colors def __call__(self, states: Vec, seed_matrix: Float64Matrix, params: Vec, history: Vec, d_history: Vec, h: float, history2: Vec, data: Vec, color_ptr: Int32Vector, col_ptr: Int32Vector, row_idx: Int32Vector, data_idx: Int32Vector, work_jvp: Float64Matrix) -> None: """ Evaluate one full structural AD sweep and scatter it into CSC storage. :param states: Current Newton iterate. :param seed_matrix: Coloring seed matrix. :param params: Full parameter vector. :param history: Previous accepted state. :param d_history: Previous derivative vector. :param h: Effective time step. :param history2: Secondary history vector. :param data: CSC numeric data buffer. :param color_ptr: Color-to-column pointer array. :param col_ptr: Column-to-scatter pointer array. :param row_idx: Row indices used by the scatter map. :param data_idx: CSC data positions used by the scatter map. :param work_jvp: Reusable JVP work buffer. :return: None. """ color_index: int = 0 while color_index < self._n_colors: local_jvp: Float64Vector = work_jvp[color_index] self._ad_kernel(states, seed_matrix[color_index], params, history, d_history, h, local_jvp, history2) _scatter_color_jvp_to_csc_data(local_jvp, data, color_ptr, col_ptr, row_idx, data_idx, color_index) color_index += 1
[docs] class StructuralCompiledResidualAssembler: """ Residual dispatcher that reuses preallocated grouped work buffers. """ __slots__ = ["_kernel_specs", "_dispatcher", "_work_buffer", "_build_stats"] def __init__(self, kernel_specs: List[StructuralCompiledVectorKernel], cache_token: str) -> None: """ Create the residual assembler. :param kernel_specs: Vectorized residual kernels. :type kernel_specs: List[StructuralCompiledVectorKernel] :return: None :rtype: None """ t_start: float = time.perf_counter() self._kernel_specs: List[StructuralCompiledVectorKernel] = kernel_specs _unused_cache_token: str = cache_token t_dispatcher_start: float = time.perf_counter() self._dispatcher: Callable[..., Any] = StructuralCompiledResidualDispatcher(kernel_specs) dispatcher_build_s: float = time.perf_counter() - t_dispatcher_start max_kernel_size: int = 0 kernel_spec: StructuralCompiledVectorKernel for kernel_spec in kernel_specs: if kernel_spec.get_row_count() > max_kernel_size: max_kernel_size = kernel_spec.get_row_count() else: pass t_buffer_start: float = time.perf_counter() # The work matrix has one row per structural group and enough columns for the largest group. self._work_buffer: Float64Matrix = np.zeros((len(kernel_specs), max_kernel_size), dtype=np.float64) buffer_alloc_s: float = time.perf_counter() - t_buffer_start self._build_stats: Dict[str, float] = { "dispatcher_build_s": dispatcher_build_s, "buffer_alloc_s": buffer_alloc_s, "total_s": time.perf_counter() - t_start, }
[docs] def evaluate( self, states: Vec, params: Vec, history: Vec, d_history: Vec, h: float, history2: Vec, data_out: Vec, ) -> None: """ Evaluate the grouped residual system in place. :param states: Current Newton iterate. :type states: Vec :param params: Full parameter vector. :type params: Vec :param history: Previous state vector. :type history: Vec :param d_history: Previous differential vector. :type d_history: Vec :param h: Local time step. :type h: float :param history2: Secondary history vector. :type history2: Vec :param data_out: Residual output buffer. :type data_out: Vec :return: None :rtype: None """ self._dispatcher(states, params, history, d_history, h, history2, data_out, self._work_buffer)
[docs] def get_work_buffer(self) -> Float64Matrix: """ Return the grouped work buffer. :return: Grouped work buffer. :rtype: Float64Matrix """ return self._work_buffer
[docs] def get_build_stats(self) -> Dict[str, float]: """ Return residual assembler setup timings. :return: Residual assembler build timings. :rtype: Dict[str, float] """ return dict(self._build_stats)
[docs] class StructuralCompiledDirectResidualAssembler: """ Residual assembler backed by one monolithic in-place kernel. """ __slots__ = ["_kernel", "_equations", "_variables", "_parameters", "_method", "_debug_info", "_build_stats"] def __init__(self, kernel: Callable[..., Any], equations: List[Expr], variables: List[Var], parameters: List[Var], method: DynamicIntegrationMethod, debug_info: List[Dict[str, Any]] | None) -> None: """ Create the direct residual assembler. :param kernel: Residual kernel. :type kernel: Callable[..., Any] :param equations: Full residual equation list. :type equations: List[Expr] :param variables: Ordered runtime variables. :type variables: List[Var] :param parameters: Ordered parameter list. :type parameters: List[Var] :param method: Implicit integration method. :type method: DynamicIntegrationMethod :param debug_info: Residual metadata aligned with the kernel ordering. :type debug_info: List[Dict[str, Any]] | None :return: None :rtype: None """ self._kernel = kernel self._equations = equations self._variables = variables self._parameters = parameters self._method = method self._debug_info = debug_info self._build_stats = { "dispatcher_build_s": 0.0, "buffer_alloc_s": 0.0, "total_s": 0.0, }
[docs] def evaluate( self, states: Vec, params: Vec, history: Vec, d_history: Vec, h: float, history2: Vec, data_out: Vec, ) -> None: try: self._kernel(states, params, history, d_history, h, data_out, history2) except ZeroDivisionError as exc: failed_index: int | None = _locate_failing_residual_with_single_equation_kernels( equations=self._equations, variables=self._variables, parameters=self._parameters, method=self._method, states=states, params=params, history=history, d_history=d_history, h=h, history2=history2, ) raise ZeroDivisionError( _format_structural_residual_zero_division_message(failed_index, self._debug_info) ) from exc
[docs] def get_work_buffer(self) -> None: return None
[docs] def get_build_stats(self) -> Dict[str, float]: return dict(self._build_stats)
[docs] class StructuralCompiledSparseADJacobian: """ Sparse structural AD Jacobian that writes directly into CSC storage. """ __slots__ = [ "_equations", "_variables", "_parameters", "_method", "_dtype", "_n_rows", "_n_cols", "_column_rows", "_jacobian_matrix", "_colors", "_n_colors", "_color_groups", "_color_ptr", "_col_ptr", "_row_idx", "_data_idx", "_compiler", "_ad_kernel", "_master_dispatcher", "_seed_matrix", "_jvp_work_buffer", "_data_buffer", "_build_stats", ] def __init__( self, equations: List[Expr], variables: List[Var], parameters: List[Var], method: DynamicIntegrationMethod, dtype: Any = np.float64, use_cse: bool = True, eager_machine_code: bool = True, cache_token: str = "default", ) -> None: """ Create the sparse AD Jacobian evaluator. :param equations: Full residual equations. :type equations: List[Expr] :param variables: Runtime variables of the DAE system. :type variables: List[Var] :param parameters: Full parameter list. :type parameters: List[Var] :param method: Implicit integration method. :type method: DynamicIntegrationMethod :param dtype: Numeric dtype of the Jacobian storage. :type dtype: Any :param use_cse: Whether color kernels should emit CSE temporaries. :type use_cse: bool :return: None :rtype: None """ t_start: float = time.perf_counter() self._equations = equations self._variables = variables self._parameters = parameters self._method = method self._dtype = dtype self._n_rows = len(equations) self._n_cols = len(variables) # The sparsity pattern is extracted once from the symbolic graph and then reused forever. t_column_rows_start: float = time.perf_counter() self._column_rows = self._build_column_rows() column_rows_s: float = time.perf_counter() - t_column_rows_start t_csc_start: float = time.perf_counter() self._jacobian_matrix, self._data_buffer = self._build_csc_matrix() csc_shell_s: float = time.perf_counter() - t_csc_start t_coloring_start: float = time.perf_counter() self._colors, self._n_colors, self._color_groups = _greedy_color_columns(self._column_rows, self._n_rows) coloring_s: float = time.perf_counter() - t_coloring_start t_scatter_start: float = time.perf_counter() self._color_ptr, self._col_ptr, self._row_idx, self._data_idx = self._build_scatter_map() scatter_map_s: float = time.perf_counter() - t_scatter_start # The eager compiler emits strictly typed kernels that write into caller-owned buffers. self._compiler = EagerEquationCompiler(variables=variables, parameters=parameters, method=method) t_ad_kernels_start: float = time.perf_counter() self._ad_kernel = self._build_ad_kernel(use_cse, eager_machine_code) ad_kernels_s: float = time.perf_counter() - t_ad_kernels_start t_dispatcher_start: float = time.perf_counter() self._master_dispatcher = _compile_master_jacobian_kernel(self._ad_kernel, self._n_colors, cache_token) dispatcher_build_s: float = time.perf_counter() - t_dispatcher_start self._seed_matrix = np.zeros((self._n_colors, self._n_cols), dtype=self._dtype) color_index = 0 while color_index < self._n_colors: for column_index in self._color_groups[color_index]: self._seed_matrix[color_index, column_index] = 1.0 color_index += 1 # One JVP row per color is enough because colors are executed sequentially. self._jvp_work_buffer = np.zeros((self._n_colors, self._n_rows), dtype=self._dtype) self._build_stats: Dict[str, float] = { "column_rows_s": column_rows_s, "csc_shell_s": csc_shell_s, "coloring_s": coloring_s, "scatter_map_s": scatter_map_s, "ad_kernels_s": ad_kernels_s, "dispatcher_build_s": dispatcher_build_s, "total_s": time.perf_counter() - t_start, } def _build_column_rows(self) -> List[List[int]]: """ Build the symbolic sparsity pattern column by column. :return: Row indices present in each sparse column. :rtype: List[List[int]] """ variable_index_by_uid: Dict[int, int] = dict() column_rows: List[List[int]] = list() row_index: int = 0 variable_index: int = 0 while variable_index < self._n_cols: variable_obj: Var = self._variables[variable_index] variable_index_by_uid[variable_obj.uid] = variable_index column_rows.append(list()) variable_index += 1 while row_index < self._n_rows: equation: Expr = self._equations[row_index] stack: List[Expr] = list() visited_nodes: Set[int] = set() dependent_uids: Set[int] = set() stack.append(equation) while len(stack) > 0: node: Expr = stack.pop() node_identifier: int = id(node) if node_identifier in visited_nodes: pass else: visited_nodes.add(node_identifier) if isinstance(node, Var): dependent_uids.add(node.uid) if node.base_var is None: pass else: dependent_uids.add(node.base_var.uid) elif isinstance(node, BinOp): stack.append(node.left) stack.append(node.right) elif isinstance(node, UnOp): stack.append(node.operand) elif isinstance(node, Func): stack.append(node.arg) else: pass dependent_uid: int for dependent_uid in dependent_uids: column_index: int | None = variable_index_by_uid.get(dependent_uid, None) if column_index is None: pass else: column_rows[column_index].append(row_index) row_index += 1 column_index = 0 while column_index < self._n_cols: column_rows[column_index] = sorted(set(column_rows[column_index])) column_index += 1 return column_rows def _build_csc_matrix(self) -> Tuple[csc_matrix, Vec]: """ Build the reusable CSC matrix shell and numeric data buffer. :return: CSC matrix shell and numeric data buffer. :rtype: Tuple[csc_matrix, Vec] """ indptr: Int32Vector = np.zeros(self._n_cols + 1, dtype=np.int32) column_index: int = 0 nnz: int = 0 while column_index < self._n_cols: nnz += len(self._column_rows[column_index]) indptr[column_index + 1] = nnz column_index += 1 indices: Int32Vector = np.empty(nnz, dtype=np.int32) data: Vec = np.zeros(nnz, dtype=self._dtype) data_position: int = 0 column_index = 0 while column_index < self._n_cols: local_index: int = 0 rows_in_column: List[int] = self._column_rows[column_index] while local_index < len(rows_in_column): indices[data_position] = rows_in_column[local_index] data_position += 1 local_index += 1 column_index += 1 matrix: csc_matrix = csc_matrix((data, indices, indptr), shape=(self._n_rows, self._n_cols)) return matrix, matrix.data def _build_scatter_map(self) -> Tuple[Int32Vector, Int32Vector, Int32Vector, Int32Vector]: """ Precompute the CSC scatter topology for colored JVPs. :return: ``color_ptr``, ``col_ptr``, ``row_idx`` and ``data_idx`` arrays. :rtype: Tuple[Int32Vector, Int32Vector, Int32Vector, Int32Vector] """ color_cols: List[int] = list() color_ptr: Int32Vector = np.zeros(self._n_colors + 1, dtype=np.int32) color_index: int = 0 while color_index < self._n_colors: color_ptr[color_index] = len(color_cols) group_columns: List[int] = self._color_groups[color_index] group_position: int = 0 while group_position < len(group_columns): color_cols.append(group_columns[group_position]) group_position += 1 color_index += 1 color_ptr[self._n_colors] = len(color_cols) col_ptr: Int32Vector = np.zeros(len(color_cols) + 1, dtype=np.int32) total_map: int = 0 color_column_index: int = 0 while color_column_index < len(color_cols): total_map += len(self._column_rows[color_cols[color_column_index]]) col_ptr[color_column_index + 1] = total_map color_column_index += 1 row_idx: Int32Vector = np.empty(total_map, dtype=np.int32) data_idx: Int32Vector = np.empty(total_map, dtype=np.int32) position: int = 0 indptr_array: Int32Vector = self._jacobian_matrix.indptr.astype(np.int32, copy=False) color_column_index = 0 while color_column_index < len(color_cols): column_index: int = color_cols[color_column_index] base_position: int = int(indptr_array[column_index]) rows_in_column: List[int] = self._column_rows[column_index] local_index: int = 0 while local_index < len(rows_in_column): row_idx[position] = rows_in_column[local_index] data_idx[position] = base_position + local_index position += 1 local_index += 1 color_column_index += 1 return color_ptr, col_ptr, row_idx, data_idx def _build_ad_kernel(self, use_cse: bool, eager_machine_code: bool) -> Callable[..., Any]: """ Compile one generic eager AD kernel reused for every graph color. :param use_cse: Whether color kernels should emit CSE temporaries. :type use_cse: bool :return: Compiled generic AD kernel. :rtype: Callable[..., Any] """ kernel_name: str = f"structural_compiled_ad_generic_{self._method.name}_{self._n_rows}_{self._n_cols}" py_func, signature_tpe = self._compiler.compile_ad_kernel( equations=self._equations, func_name=kernel_name, use_cse=use_cse, active_indices=None, ) if eager_machine_code: return _safe_njit(py_func, signature_tpe=signature_tpe, fastmath=True, cache=True) else: return py_func
[docs] def evaluate( self, states: Vec, params: Vec, history: Vec, d_history: Vec, h: float, history2: Vec, ) -> csc_matrix: """ Evaluate the sparse Jacobian into the reusable CSC storage. :param states: Current Newton iterate. :type states: Vec :param params: Full parameter vector. :type params: Vec :param history: Previous state vector. :type history: Vec :param d_history: Previous differential vector. :type d_history: Vec :param h: Local time step. :type h: float :param history2: Secondary history vector. :type history2: Vec :return: Reused CSC Jacobian matrix. :rtype: csc_matrix """ self._master_dispatcher( states, self._seed_matrix, params, history, d_history, h, history2, self._data_buffer, self._color_ptr, self._col_ptr, self._row_idx, self._data_idx, self._jvp_work_buffer, ) return self._jacobian_matrix
[docs] def get_data_buffer(self) -> Vec: """ Return the reusable CSC numeric buffer. :return: CSC numeric buffer. :rtype: Vec """ return self._data_buffer
[docs] def get_matrix(self) -> csc_matrix: """ Return the reusable CSC matrix shell. :return: CSC Jacobian matrix. :rtype: csc_matrix """ return self._jacobian_matrix
[docs] def get_build_stats(self) -> Dict[str, float]: """ Return sparse Jacobian build timings. :return: Sparse Jacobian build timings. :rtype: Dict[str, float] """ return dict(self._build_stats)
[docs] class StructuralCompiledSparseFactorizationManager: """ Sparse linear-solve manager for ``StructuralCompiledSolver``. The manager keeps the current numeric factorization alive across time steps, caches one column ordering once it has been discovered, and reuses persistent numeric buffers so the solver hot loop does not rebuild sparse matrix shells. """ __slots__ = ["_base_matrix", "_backend", "_analysis_handle", "_cached_handle", "_solution_buffer", "_stats"] def __init__(self, base_matrix: csc_matrix, base_data: Vec, backend_provider: SparseLinearSolverBackendProvider | None = None) -> None: """ Create the sparse factorization manager. :param base_matrix: Reusable Jacobian CSC shell in original variable order. :type base_matrix: csc_matrix :param base_data: Reusable CSC numeric buffer of ``base_matrix``. :type base_data: Vec :return: None :rtype: None """ n_cols: int = int(base_matrix.shape[1]) resolved_provider: SparseLinearSolverBackendProvider if backend_provider is None: resolved_provider = SuperLUSparseBackendProvider() else: resolved_provider = backend_provider self._base_matrix: csc_matrix = base_matrix self._backend: SparseLinearSolverBackend = resolved_provider.create_backend(base_matrix, base_data) self._analysis_handle: Any | None = None self._cached_handle: SparseLinearFactorizationHandle | None = None self._solution_buffer: Vec = np.zeros(n_cols, dtype=np.float64) self._stats: Dict[str, float] = { "numeric_factorizations": 0.0, "invalidations": 0.0, "solve_calls": 0.0, "fallback_solves": 0.0, } if self._backend.supports_symbolic_analysis_reuse(): self._analysis_handle = self._backend.analyze(base_matrix) else: self._analysis_handle = None
[docs] def invalidate(self) -> None: """ Drop the current numeric factorization while keeping all persistent shells. :return: None :rtype: None """ self._cached_handle = None self._stats["invalidations"] += 1.0
[docs] def has_factorization(self) -> bool: """ Return whether one numeric factorization is currently cached. :return: ``True`` when a factorization is available. :rtype: bool """ return self._cached_handle is not None
[docs] def factorize(self) -> SparseLinearFactorizationHandle: """ Factorize the current Jacobian numeric values. :return: Active sparse factorization handle. :rtype: SparseLinearFactorizationHandle """ self._cached_handle = self._backend.factorize(self._base_matrix, self._analysis_handle) self._stats["numeric_factorizations"] += 1.0 return self._cached_handle
[docs] def solve(self, rhs: Vec) -> Vec: """ Solve the linear system using the current numeric factorization. :param rhs: Right-hand side vector. :type rhs: Vec :return: Solution buffer in original solver order. :rtype: Vec """ self._stats["solve_calls"] += 1.0 if self._cached_handle is None: self._solution_buffer[:] = 0.0 else: self._cached_handle.solve_into(rhs, self._solution_buffer) return self._solution_buffer
[docs] def solve_fallback(self, rhs: Vec) -> Vec: """ Solve the linear system with an iterative sparse fallback. :param rhs: Right-hand side vector. :type rhs: Vec :return: Solution buffer in original solver order. :rtype: Vec """ active_matrix: csc_matrix = self.get_active_matrix() raw_solution: Vec = np.asarray(sparse_lsqr_fallback(active_matrix, rhs), dtype=np.float64) self._stats["fallback_solves"] += 1.0 self._solution_buffer[:] = raw_solution return self._solution_buffer
[docs] def get_active_matrix(self) -> csc_matrix: """ Return the matrix used by the active numeric factorization path. :return: Active sparse matrix. :rtype: csc_matrix """ if self._cached_handle is None: return self._base_matrix else: return self._cached_handle.get_active_matrix()
[docs] def get_stats(self) -> Dict[str, float]: """ Return cumulative factorization-manager statistics. :return: Cumulative statistics dictionary. :rtype: Dict[str, float] """ merged_stats: Dict[str, float] = dict(self._stats) backend_stats: Dict[str, float] = self._backend.get_backend_stats() for key, value in backend_stats.items(): merged_stats[key] = float(value) return merged_stats
[docs] class StructuralCompiledSolver: """ Standalone structural EMT solver with eager kernels and reusable buffers. The solver keeps the linear algebra in SciPy, but it removes the Python-side allocation churn from residual assembly and sparse Jacobian evaluation. """ __slots__ = [ "_problem", "_t0", "_t_end", "_h", "_method", "_pred_method", "_dense_threshold", "_verbose", "_warmup_policy", "_state_vars", "_algebraic_vars", "_diff_vars", "_state_eqs", "_algebraic_eqs", "_sorted_vars", "_event_parameters", "_constant_parameters", "_parameter_values", "_uid2idx_vars", "_n_state", "_n_vars", "_n_diff", "_n_event_parameters", "_n_constant_parameters", "_residual_assembler", "_jacobian_evaluator", "_residual_buffer", "_linear_rhs_buffer", "_jacobian_data_buffer", "_static_parameter_buffer", "_full_parameter_buffer", "_last_factorization_parameter_snapshot", "_has_factorization_parameter_snapshot", "_delta_buffer", "_trial_state_buffer", "_trial_residual_buffer", "_trial_residual_evaluator", "_vectorized_ready", "_backend_warmup_done", "_last_macro_newton_iterations", "_last_sim_loop_time", "_last_runtime_stats", "_predictor", "_backend_build_stats", "_newton_max_iter", "_newton_diag_config", "_sparse_factorization_manager", "_sparse_solver_backend_provider", ] def __init__( self, problem: EmtProblemTemplate, t0: float, t_end: float, h: float, method: DynamicIntegrationMethod = DynamicIntegrationMethod.DaeTrapezoidal, pred_method: DynamicIntegrationMethod | None = None, dense_threshold: int = 100, verbose: bool = False, newton_max_iter: int = 15, auto_build: bool = True, warmup_policy: StructuralCompiledWarmupPolicy = StructuralCompiledWarmupPolicy.Adaptive, sparse_solver_backend_provider: SparseLinearSolverBackendProvider | None = None, newton_diag_config: NewtonDiagnosticsConfig | None = None, ) -> None: """ Create the structural compiled EMT solver. :param problem: EMT problem wrapper. :type problem: EmtProblemTemplate :param t0: Initial time. :type t0: float :param t_end: End time. :type t_end: float :param h: Nominal time step. :type h: float :param method: Implicit integration method. :type method: DynamicIntegrationMethod :param pred_method: Predictor method used on the first trapezoidal step. :type pred_method: DynamicIntegrationMethod | None :param dense_threshold: Threshold to switch between dense and sparse linear algebra. :type dense_threshold: int :param verbose: Whether to print build and simulation timings. :type verbose: bool :param newton_max_iter: Maximum Newton iterations per local EMT substep. :type newton_max_iter: int :param auto_build: Whether to build the vectorized backend during construction. :type auto_build: bool :param warmup_policy: Warmup policy used during backend build. :type warmup_policy: StructuralCompiledWarmupPolicy :param sparse_solver_backend_provider: Sparse linear solver backend provider. :type sparse_solver_backend_provider: SparseLinearSolverBackendProvider | None :return: None :rtype: None """ self._problem = problem self._t0 = t0 self._t_end = t_end self._h = h self._method = method self._pred_method = pred_method self._dense_threshold = dense_threshold self._verbose = verbose self._newton_max_iter: int = int(newton_max_iter) self._warmup_policy = warmup_policy self._sparse_solver_backend_provider = sparse_solver_backend_provider self._newton_diag_config = newton_diag_config or NewtonDiagnosticsConfig( compute_dense_cond=False, enable_fallback=False, enable_index1_check=False, enable_backtracking=False, ) # The solver captures the problem structure once so all hot loops use local buffers only. self._state_vars = self._problem.get_state_vars() self._algebraic_vars = self._problem.get_algebraic_vars() self._diff_vars = self._problem.get_diff_vars() self._state_eqs = self._problem.get_state_eqs() self._algebraic_eqs = self._problem.get_algebraic_eqs() self._sorted_vars = list() self._sorted_vars.extend(self._state_vars) self._sorted_vars.extend(self._algebraic_vars) self._event_parameters = self._problem.get_variable_parameters() self._constant_parameters = self._problem.get_constant_parameters() self._parameter_values = self._problem.get_parameters_values() self._uid2idx_vars = dict() self._n_state = len(self._state_vars) self._n_vars = len(self._sorted_vars) self._n_diff = len(self._diff_vars) self._n_event_parameters = len(self._event_parameters) self._n_constant_parameters = len(self._constant_parameters) variable_index: int = 0 while variable_index < self._n_vars: variable_obj: Var = self._sorted_vars[variable_index] self._uid2idx_vars[variable_obj.uid] = variable_index variable_index += 1 # The residual and right-hand-side buffers are persistent across all Newton iterations. self._residual_buffer = np.zeros(self._n_vars, dtype=np.float64) self._linear_rhs_buffer = np.zeros(self._n_vars, dtype=np.float64) self._jacobian_data_buffer = np.zeros(0, dtype=np.float64) # Constant parameters are flattened once because they never change during simulation. self._static_parameter_buffer = np.zeros(self._n_constant_parameters, dtype=np.float64) variable_index = 0 while variable_index < self._n_constant_parameters: self._static_parameter_buffer[variable_index] = float(self._parameter_values[variable_index].value) variable_index += 1 # The full parameter vector is reused and refreshed in place at every local substep. self._full_parameter_buffer = np.zeros( self._n_event_parameters + self._n_constant_parameters, dtype=np.float64, ) self._last_factorization_parameter_snapshot = np.zeros( self._n_event_parameters + self._n_constant_parameters, dtype=np.float64, ) self._has_factorization_parameter_snapshot = False self._delta_buffer = np.zeros(self._n_vars, dtype=np.float64) self._trial_state_buffer = np.zeros(self._n_vars, dtype=np.float64) self._trial_residual_buffer = np.zeros(self._n_vars, dtype=np.float64) self._trial_residual_evaluator = StructuralCompiledResidualTrialEvaluator() self._residual_assembler: StructuralCompiledResidualAssembler | None = None self._jacobian_evaluator: StructuralCompiledSparseADJacobian | None = None self._sparse_factorization_manager: StructuralCompiledSparseFactorizationManager | None = None self._vectorized_ready = False self._backend_warmup_done = False self._last_macro_newton_iterations = 0 self._last_sim_loop_time = 0.0 self._last_runtime_stats: Dict[str, float] = dict() self._predictor = StructuralCompiledPredictor(self._n_state) self._backend_build_stats: Dict[str, float] = dict() if auto_build: self._build_vectorized_backend(method) else: pass def _build_vectorized_backend(self, method: DynamicIntegrationMethod) -> None: """ Analyze the symbolic structure and build the eager numerical backend. :param method: Integration method used by the generated kernels. :type method: DynamicIntegrationMethod :return: None :rtype: None """ t_start: float = time.perf_counter() groups: Dict[str, StructuralCompiledEquationGroup] = dict() runtime_uids: Set[int] = set(self._uid2idx_vars.keys()) parameter_uids: Set[int] = _build_parameter_uid_set(self._event_parameters, self._constant_parameters) full_eqs: List[Expr] = _build_full_residual_equations( self._state_vars, self._state_eqs, self._algebraic_eqs, ) equation_index: int = 0 t_grouping_start: float = time.perf_counter() # Structural grouping uses the full canonical string as the lookup key to avoid hash collisions. while equation_index < len(full_eqs): equation: Expr = full_eqs[equation_index] signature_str: str runtime_vars: List[Var] group: StructuralCompiledEquationGroup variable_indices: List[int] = list() runtime_var_index: int = 0 signature_str, runtime_vars = canonicalize_expression(equation, runtime_uids, parameter_uids) if signature_str in groups: group = groups[signature_str] else: group = StructuralCompiledEquationGroup() groups[signature_str] = group while runtime_var_index < len(runtime_vars): runtime_uid: int = _get_runtime_uid(runtime_vars[runtime_var_index]) mapped_index: int | None = self._uid2idx_vars.get(runtime_uid, None) if mapped_index is None: raise RuntimeError(f"Missing runtime variable mapping for uid={runtime_uid}") else: variable_indices.append(mapped_index) runtime_var_index += 1 if group.get_template_eq() is None: group.set_template(equation, runtime_vars) else: template_vars: List[Var] | None = group.get_template_vars() if template_vars is None: raise RuntimeError("Internal structural group without template variables") else: if len(template_vars) == len(variable_indices): pass else: raise RuntimeError("Inconsistent runtime variable count inside structural group") group.add_indices(variable_indices, equation_index) equation_index += 1 grouping_s: float = time.perf_counter() - t_grouping_start if self._verbose: print(f"--- [STRUCT-COMPILED] Detected {len(groups)} structural groups ---", flush=True) else: pass all_parameters: List[Var] = list() all_parameters.extend(self._event_parameters) all_parameters.extend(self._constant_parameters) eager_compiler: EagerEquationCompiler = EagerEquationCompiler( variables=self._sorted_vars, parameters=all_parameters, method=method, ) # Group kernels are compiled in deterministic order so disk cache keys remain stable. group_keys: List[str] = sorted(list(groups.keys())) backend_cache_token: str = _build_backend_cache_token(method, len(full_eqs), self._n_vars, group_keys) avg_group_size: float = float(len(full_eqs) / max(len(group_keys), 1)) use_direct_residual: bool = len(group_keys) >= 20 and avg_group_size <= 3.0 estimated_steps: int = int(max(1.0, round((self._t_end - self._t0) / self._h))) full_warmup_enabled: bool = _should_run_full_backend_warmup( self._warmup_policy, self._n_vars, estimated_steps, use_direct_residual, ) residual_use_cse: bool = True jacobian_use_cse: bool = True # Short runs should avoid paying the full Numba compilation cost up front. In those cases the # generated Python kernels are fast enough and keep the structural backend usable. eager_machine_code: bool = full_warmup_enabled kernel_specs: List[StructuralCompiledVectorKernel] = list() group_key_index: int = 0 if self._verbose: print( f"--- [STRUCT-COMPILED] Grouping ready in {grouping_s:.4f}s | " f"n_vars={self._n_vars} | n_eqs={len(full_eqs)} | " f"direct_residual={use_direct_residual} | warmup={self._warmup_policy.value} ---", flush=True, ) else: pass t_vec_kernel_compile_start: float = time.perf_counter() if use_direct_residual: direct_name: str = f"structural_compiled_residual_{backend_cache_token}" py_func, signature_tpe = eager_compiler.compile( equations=full_eqs, func_name=direct_name, use_cse=residual_use_cse, offset=0, inplace=True, ) if eager_machine_code: compiled_residual = _safe_njit(py_func, signature_tpe=signature_tpe, fastmath=True, cache=True) else: compiled_residual = py_func else: while group_key_index < len(group_keys): group_key: str = group_keys[group_key_index] group: StructuralCompiledEquationGroup = groups[group_key] template_eq: Expr | None = group.get_template_eq() template_vars: List[Var] | None = group.get_template_vars() if template_eq is None or template_vars is None: raise RuntimeError("Invalid structural group without template information") else: pass kernel_name: str = f"structural_compiled_vec_{method.name}_{group_key_index}" py_func: Callable[..., Any] signature_tpe: Any compiled_kernel: Callable[..., Any] indices: Int32Matrix = np.asarray(group.get_index_matrix(), dtype=np.int32) target_rows: Int32Vector = np.asarray(group.get_row_indices(), dtype=np.int32) py_func, signature_tpe = eager_compiler.compile_matrix_kernel( template_eq=template_eq, func_name=kernel_name, template_vars=template_vars, ) if eager_machine_code: compiled_kernel = _safe_njit(py_func, signature_tpe=signature_tpe, fastmath=True, cache=True) else: compiled_kernel = py_func kernel_specs.append(StructuralCompiledVectorKernel(compiled_kernel, indices, target_rows)) group_key_index += 1 vec_kernel_compile_s: float = time.perf_counter() - t_vec_kernel_compile_start if self._verbose: print(f"--- [STRUCT-COMPILED] Residual kernels ready in {vec_kernel_compile_s:.4f}s ---", flush=True) else: pass # The residual assembler owns the reusable grouped work buffers. t_residual_assembler_start: float = time.perf_counter() residual_debug_info: List[Dict[str, Any]] = _build_structural_residual_debug_info( self._state_vars, self._algebraic_vars, self._state_eqs, self._algebraic_eqs, self._uid2idx_vars, ) if use_direct_residual: self._residual_assembler = StructuralCompiledDirectResidualAssembler( compiled_residual, full_eqs, self._sorted_vars, all_parameters, method, residual_debug_info, ) else: self._residual_assembler = StructuralCompiledResidualAssembler(kernel_specs, backend_cache_token) residual_assembler_s: float = time.perf_counter() - t_residual_assembler_start if self._verbose: print(f"--- [STRUCT-COMPILED] Residual assembler ready in {residual_assembler_s:.4f}s ---", flush=True) else: pass # The sparse Jacobian evaluator owns the reusable CSC numeric storage. t_jacobian_evaluator_start: float = time.perf_counter() self._jacobian_evaluator = StructuralCompiledSparseADJacobian( equations=full_eqs, variables=self._sorted_vars, parameters=all_parameters, method=method, dtype=np.float64, use_cse=jacobian_use_cse, eager_machine_code=eager_machine_code, cache_token=backend_cache_token, ) jacobian_evaluator_s: float = time.perf_counter() - t_jacobian_evaluator_start self._jacobian_data_buffer = self._jacobian_evaluator.get_data_buffer() self._sparse_factorization_manager = StructuralCompiledSparseFactorizationManager( self._jacobian_evaluator.get_matrix(), self._jacobian_data_buffer, self._sparse_solver_backend_provider, ) if self._verbose: print(f"--- [STRUCT-COMPILED] Jacobian evaluator ready in {jacobian_evaluator_s:.4f}s ---", flush=True) else: pass # The helper kernels are always warmed because they are tiny and their # first-use compilation would otherwise show up as jitter inside the loop. helper_warmup_t0: float = time.perf_counter() _warmup_compiled_numba_helpers() helper_warmup_s: float = time.perf_counter() - helper_warmup_t0 residual_warmup_s: float = 0.0 jacobian_warmup_s: float = 0.0 if full_warmup_enabled: warmup_x: Vec = self._problem.get_x0().copy() warmup_dx: Vec = self._problem.get_dx0().copy() warmup_x2: Vec = warmup_x.copy() warmup_runtime_params: Vec = self._problem.event_params_values.copy() warmup_full_params: Vec = self._full_parameter_buffer # The warmup only needs stable shapes and valid memory layouts, so the # raw event parameter vector is enough here and avoids one extra Python # parameter-refresh pass during backend build. _fill_full_parameter_buffer(warmup_runtime_params, self._static_parameter_buffer, warmup_full_params) residual_warmup_t0: float = time.perf_counter() self._residual_assembler.evaluate(warmup_x, warmup_full_params, warmup_x, warmup_dx, self._h, warmup_x2, self._residual_buffer) residual_warmup_s = time.perf_counter() - residual_warmup_t0 jacobian_warmup_t0: float = time.perf_counter() self._jacobian_evaluator.evaluate(warmup_x, warmup_full_params, warmup_x, warmup_dx, self._h, warmup_x2) jacobian_warmup_s = time.perf_counter() - jacobian_warmup_t0 else: pass warmup_s: float = helper_warmup_s + residual_warmup_s + jacobian_warmup_s if self._verbose: print( f"--- [STRUCT-COMPILED] Warmup ready in {warmup_s:.4f}s | " f"helpers={helper_warmup_s:.4f}s | residual={residual_warmup_s:.4f}s | " f"jacobian={jacobian_warmup_s:.4f}s ---", flush=True, ) else: pass self._vectorized_ready = True self._backend_warmup_done = full_warmup_enabled residual_stats: Dict[str, float] = self._residual_assembler.get_build_stats() jacobian_stats: Dict[str, float] = self._jacobian_evaluator.get_build_stats() self._backend_build_stats = { "grouping_s": grouping_s, "vec_kernel_compile_s": vec_kernel_compile_s, "residual_assembler_s": residual_assembler_s, "residual_dispatcher_s": residual_stats.get("dispatcher_build_s", 0.0), "residual_buffer_alloc_s": residual_stats.get("buffer_alloc_s", 0.0), "jacobian_evaluator_s": jacobian_evaluator_s, "warmup_s": warmup_s, "helper_warmup_s": helper_warmup_s, "residual_warmup_s": residual_warmup_s, "jacobian_warmup_s": jacobian_warmup_s, "full_warmup_enabled": 1.0 if full_warmup_enabled else 0.0, "eager_machine_code": 1.0 if eager_machine_code else 0.0, "residual_use_cse": 1.0 if residual_use_cse else 0.0, "jacobian_use_cse": 1.0 if jacobian_use_cse else 0.0, "estimated_steps": float(estimated_steps), "jac_column_rows_s": jacobian_stats.get("column_rows_s", 0.0), "jac_csc_shell_s": jacobian_stats.get("csc_shell_s", 0.0), "jac_coloring_s": jacobian_stats.get("coloring_s", 0.0), "jac_scatter_map_s": jacobian_stats.get("scatter_map_s", 0.0), "jac_ad_kernels_s": jacobian_stats.get("ad_kernels_s", 0.0), "jac_dispatcher_s": jacobian_stats.get("dispatcher_build_s", 0.0), "total_s": time.perf_counter() - t_start, } if self._verbose: elapsed: float = self._backend_build_stats["total_s"] print(f"--- [STRUCT-COMPILED] Backend ready in {elapsed:.4f}s ---", flush=True) else: pass
[docs] def get_last_sim_loop_time(self) -> float: """ Return the last measured integration loop wall time. :return: Last measured loop time. :rtype: float """ return self._last_sim_loop_time
[docs] def is_vectorized_ready(self) -> bool: """ Return whether the eager backend has already been built. :return: ``True`` when the backend is ready. :rtype: bool """ return self._vectorized_ready
[docs] def get_residual_buffer(self) -> Vec: """ Return the persistent residual buffer. :return: Persistent residual buffer. :rtype: Vec """ return self._residual_buffer
[docs] def get_jacobian_data_buffer(self) -> Vec: """ Return the persistent CSC numeric buffer. :return: Persistent CSC numeric buffer. :rtype: Vec """ return self._jacobian_data_buffer
[docs] def get_backend_build_stats(self) -> Dict[str, float]: """ Return setup timings for the structural compiled backend build. :return: Setup timings grouped by build phase. :rtype: Dict[str, float] """ return dict(self._backend_build_stats)
[docs] def get_last_runtime_stats(self) -> Dict[str, float]: """ Return runtime statistics collected during the latest simulation. :return: Latest runtime statistics. :rtype: Dict[str, float] """ return dict(self._last_runtime_stats)
[docs] def simulate( self, x0: Optional[Vec] = None, dx0: Optional[Vec] = None, params0: Optional[Vec] = None, boundary_updater: EmtBoundaryUpdateProtocol | None = None, ) -> Tuple[Vec, Mat, Mat, bool, bool]: """ Run the implicit EMT simulation with eager structural kernels. :param x0: Optional initial state vector. :type x0: Vec | None :param dx0: Optional initial differential vector. :type dx0: Vec | None :param params0: Optional initial runtime parameters. :type params0: Vec | None :param boundary_updater: Optional override for the problem boundary updater. :type boundary_updater: EmtBoundaryUpdateProtocol | None :return: Time vector, state trajectory and differential trajectory. :rtype: Tuple[Vec, Mat, Mat] """ converged: bool = True well_initialized: bool = True if self._vectorized_ready: pass else: self._build_vectorized_backend(self._method) if self._residual_assembler is None or self._jacobian_evaluator is None: raise RuntimeError("Structural compiled backend was not built correctly") else: pass steps: int = int(np.ceil((self._t_end - self._t0) / self._h)) t: Vec = self._t0 + self._h * np.arange(steps + 1, dtype=np.float64) y: Mat = np.zeros((steps + 1, self._n_vars), dtype=np.float64) dy: Mat = np.zeros((steps + 1, self._n_diff), dtype=np.float64) if x0 is None: x_prev: Vec = self._problem.get_x0().copy() else: x_prev = np.array(x0, dtype=np.float64, copy=True) if dx0 is None: dx_prev: Vec = self._problem.get_dx0().copy() else: dx_prev = np.array(dx0, dtype=np.float64, copy=True) active_boundary_updater = resolve_solver_boundary_updater(self._problem, boundary_updater, float(self._t0)) x_prev2: Vec = x_prev.copy() x_iter: Vec = x_prev.copy() trial_state: Vec = self._trial_state_buffer trial_residual: Vec = self._trial_residual_buffer if params0 is None: runtime_params = self._problem.event_params_values.copy() else: runtime_params = np.array(params0, dtype=np.float64, copy=True) y[0, :] = x_prev dy[0, :] = dx_prev full_params: Vec = self._full_parameter_buffer if self._n_event_parameters > 0: runtime_params[:] = self._problem.def_event_params_fn(runtime_params, float(self._t0)) else: runtime_params = runtime_params _fill_full_parameter_buffer(runtime_params, self._static_parameter_buffer, full_params) if active_boundary_updater is None: pass else: active_boundary_updater.update(float(self._t0), x_prev, full_params) if self._n_event_parameters > 0: runtime_params[:] = full_params[:self._n_event_parameters] else: pass use_dense_solver: bool = self._n_vars <= self._dense_threshold sparse_factor_manager: StructuralCompiledSparseFactorizationManager | None if use_dense_solver: sparse_factor_manager = None else: sparse_factor_manager = self._sparse_factorization_manager trace_collector = self._problem.get_newton_trace_collector() diag_cfg = self._newton_diag_config diagnostics_enabled: bool = ( trace_collector is not None or diag_cfg.compute_dense_cond or diag_cfg.enable_fallback or diag_cfg.enable_index1_check or diag_cfg.enable_backtracking ) dense_solve: Callable[..., Vec] | None = None sparse_solve: Callable[..., Vec] | None = None if sparse_factor_manager is None: sparse_stats_start: Dict[str, float] = dict() else: sparse_stats_start = sparse_factor_manager.get_stats() jacobian_eval_count: int = 0 dense_factorization_count: int = 0 factorization_reuse_hits: int = 0 forced_factorization_invalidations: int = 0 if diagnostics_enabled: dense_solve = with_newton_diagnostics( _dense_lu_bundle_solve, fallback_solve=_dense_lu_bundle_fallback, collector=trace_collector, config=diag_cfg, solver_name="dense_lu", matrix_getter=_dense_lu_bundle_matrix, ) sparse_solve = with_newton_diagnostics( _sparse_factor_manager_solve, fallback_solve=_sparse_factor_manager_fallback, collector=trace_collector, config=diag_cfg, solver_name="superlu", matrix_getter=_sparse_factor_manager_matrix, ) else: pass # The eager kernels only need one warmup per solver lifetime. Skipping # repeated warmups removes avoidable overhead from successive benchmark runs. if self._backend_warmup_done: pass else: self._residual_assembler.evaluate(x_prev, full_params, x_prev, dx_prev, self._h, x_prev2, self._residual_buffer) self._jacobian_evaluator.evaluate(x_prev, full_params, x_prev, dx_prev, self._h, x_prev2) self._backend_warmup_done = True loop_start: float = time.perf_counter() if self._verbose: print("-> StructuralCompiled simulation started") else: pass step_index: int = 0 while step_index < steps: t_step_start: float = float(t[step_index]) t_step_target: float = float(t[step_index + 1]) t_local_prev: float = t_step_start is_first_local_step: bool = True while t_local_prev < (t_step_target - 1e-15): forced_event_time: float | None forced_event_time = get_solver_forced_event_time( active_boundary_updater, float(t_local_prev), float(t_step_target), ) if forced_event_time is None: t_curr: float = t_step_target else: t_curr = min(float(forced_event_time), t_step_target) if t_curr <= t_local_prev + 1e-15: t_curr = t_step_target else: pass is_aligned_substep: bool = t_curr < (t_step_target - 1e-15) if self._method == DynamicIntegrationMethod.DaeBDF2 and is_aligned_substep: raise NotImplementedError( "force_step_alignment is not implemented for DaeBDF2 in StructuralCompiledSolver" ) else: pass h_eff: float = float(t_curr - t_local_prev) # Runtime parameters are refreshed before every Newton solve because events may move them. if self._n_event_parameters > 0: runtime_params[:] = self._problem.def_event_params_fn(runtime_params, float(t_curr)) else: runtime_params = runtime_params _fill_full_parameter_buffer(runtime_params, self._static_parameter_buffer, full_params) if active_boundary_updater is None: pass else: active_boundary_updater.update(float(t_curr), x_prev, full_params) if self._n_event_parameters > 0: runtime_params[:] = full_params[:self._n_event_parameters] else: pass x_iter[:] = x_prev last_res_norm: float = 1.0 cached_lu_dense: DenseSolveBundle | None = None if sparse_factor_manager is None: pass else: if is_aligned_substep and sparse_factor_manager.has_factorization(): params_changed: bool = True if self._has_factorization_parameter_snapshot: params_changed = _max_abs_diff_vectors( full_params, self._last_factorization_parameter_snapshot, ) > 1.0e-14 else: params_changed = True if params_changed: sparse_factor_manager.invalidate() forced_factorization_invalidations += 1 else: forced_factorization_invalidations = forced_factorization_invalidations else: pass # The first trapezoidal step benefits from a cheap explicit predictor. if step_index == 0 and is_first_local_step and self._method == DynamicIntegrationMethod.DaeTrapezoidal: self._predictor.predict(x_iter, x_prev, dx_prev, h_eff, self._pred_method) else: pass substep_converged: bool = False newton_index: int = 0 while newton_index < self._newton_max_iter: ctx: NewtonSolveContext | None = None # Residual assembly is fully in-place and reuses the grouped work buffers. self._residual_assembler.evaluate( x_iter, full_params, x_prev, dx_prev, h_eff, x_prev2, self._residual_buffer, ) res_norm: float = _max_abs_value(self._residual_buffer) if res_norm < 1e-6: substep_converged = True break else: pass if diagnostics_enabled: ctx = NewtonSolveContext( t=float(t_curr), step_idx=int(step_index), newton_iter=int(newton_index), phase="jit_structural_compiled", method=str(self._method), ) ctx.res_norm_inf = res_norm else: pass if self._last_macro_newton_iterations <= 2: adaptive_recompute_threshold: float = 0.8 elif self._last_macro_newton_iterations <= 4: adaptive_recompute_threshold = 0.6 else: adaptive_recompute_threshold = 0.5 recompute_factorization: bool = ( (cached_lu_dense is None and (sparse_factor_manager is None or not sparse_factor_manager.has_factorization())) or (newton_index > 0 and (res_norm / (last_res_norm + 1e-16)) > adaptive_recompute_threshold) ) if recompute_factorization: # The Jacobian numeric values are written directly into CSC storage. cached_jacobian: csc_matrix = self._jacobian_evaluator.evaluate( x_iter, full_params, x_prev, dx_prev, h_eff, x_prev2, ) jacobian_eval_count += 1 if use_dense_solver: dense_jacobian = cached_jacobian.toarray() if diagnostics_enabled and ctx is not None: maybe_check_index1(dense_jacobian, self._n_state, ctx=ctx, config=diag_cfg) else: pass cached_lu_dense = (lu_factor(dense_jacobian), dense_jacobian) dense_factorization_count += 1 else: if diagnostics_enabled and ctx is not None: maybe_check_index1(cached_jacobian, self._n_state, ctx=ctx, config=diag_cfg) else: pass if sparse_factor_manager is None: raise RuntimeError("Missing sparse factorization manager in sparse StructuralCompiled path") else: sparse_factor_manager.factorize() self._last_factorization_parameter_snapshot[:] = full_params self._has_factorization_parameter_snapshot = True cached_lu_dense = None else: factorization_reuse_hits += 1 # The right-hand side is also written in place to avoid creating -res arrays. _copy_negated_vector(self._residual_buffer, self._linear_rhs_buffer) if use_dense_solver: if cached_lu_dense is None: self._delta_buffer[:] = 0.0 delta: Vec = self._delta_buffer else: if diagnostics_enabled and ctx is not None: assert dense_solve is not None self._delta_buffer[:] = dense_solve(cached_lu_dense, self._linear_rhs_buffer, ctx) delta = self._delta_buffer else: self._delta_buffer[:] = lu_solve(cached_lu_dense[0], self._linear_rhs_buffer) delta = self._delta_buffer else: if sparse_factor_manager is None or not sparse_factor_manager.has_factorization(): self._delta_buffer[:] = 0.0 delta = self._delta_buffer else: if diagnostics_enabled and ctx is not None: assert sparse_solve is not None delta = sparse_solve(sparse_factor_manager, self._linear_rhs_buffer, ctx) else: delta = sparse_factor_manager.solve(self._linear_rhs_buffer) if diag_cfg.enable_backtracking: self._trial_residual_evaluator.set_context( self._residual_assembler, full_params, x_prev, dx_prev, h_eff, x_prev2, ) maybe_apply_backtracking( x_iter, delta, res_norm, trial_state, trial_residual, evaluate_residual=self._trial_residual_evaluator.evaluate, config=diag_cfg, ) else: x_iter += delta last_res_norm = res_norm newton_index += 1 if not substep_converged: converged = False if step_index == 0 and is_first_local_step: well_initialized = False # Differential history is updated according to the selected integration rule. self._last_macro_newton_iterations = int(newton_index) if self._method == DynamicIntegrationMethod.DaeTrapezoidal: dx_prev[:self._n_state] = ( (2.0 / h_eff) * (x_iter[:self._n_state] - x_prev[:self._n_state]) - dx_prev[:self._n_state] ) elif self._method == DynamicIntegrationMethod.DaeBDF2: x_prev2[:] = x_prev dx_prev[:self._n_state] = ( 1.5 * x_iter[:self._n_state] - 2.0 * x_prev[:self._n_state] + 0.5 * x_prev2[:self._n_state] ) / h_eff else: dx_prev[:self._n_state] = ( x_iter[:self._n_state] - x_prev[:self._n_state] ) / h_eff if self._method == DynamicIntegrationMethod.DaeBDF2: pass else: x_prev2[:] = x_prev x_prev[:] = x_iter t_local_prev = t_curr is_first_local_step = False y[step_index + 1, :] = x_prev dy[step_index + 1, :] = dx_prev step_index += 1 self._last_sim_loop_time = time.perf_counter() - loop_start if self._verbose: print(f"-> StructuralCompiled simulation finished in {self._last_sim_loop_time:.4f}s") else: pass if sparse_factor_manager is None: sparse_stats_end: Dict[str, float] = dict() else: sparse_stats_end = sparse_factor_manager.get_stats() self._last_runtime_stats = { "jacobian_evaluations": float(jacobian_eval_count), "dense_factorizations": float(dense_factorization_count), "factorization_reuse_hits": float(factorization_reuse_hits), "forced_factorization_invalidations": float(forced_factorization_invalidations), "sparse_numeric_factorizations": float( sparse_stats_end.get("numeric_factorizations", 0.0) - sparse_stats_start.get("numeric_factorizations", 0.0) ), "sparse_ordering_builds": float( sparse_stats_end.get("ordering_builds", 0.0) - sparse_stats_start.get("ordering_builds", 0.0) ), "sparse_ordering_reuses": float( sparse_stats_end.get("ordering_reuses", 0.0) - sparse_stats_start.get("ordering_reuses", 0.0) ), "sparse_invalidations": float( sparse_stats_end.get("invalidations", 0.0) - sparse_stats_start.get("invalidations", 0.0) ), "sparse_solve_calls": float( sparse_stats_end.get("solve_calls", 0.0) - sparse_stats_start.get("solve_calls", 0.0) ), "sparse_fallback_solves": float( sparse_stats_end.get("fallback_solves", 0.0) - sparse_stats_start.get("fallback_solves", 0.0) ), } return t, y, dy,well_initialized, converged