Source code for VeraGridEngine.Simulations.Rms.numerical.back_euler_mti

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations

import os
from itertools import product
import warnings
import time

import numpy as np
import scipy.sparse as sp
from scipy.sparse.linalg import MatrixRankWarning
from scipy.optimize import least_squares

from VeraGridEngine.Simulations.Rms.numerical.back_euler_fx import BackEulerImplicitIntegration
from VeraGridEngine.Simulations.Rms.problems.rms_problem_MTI import RmsProblemMTI
from VeraGridEngine.basic_structures import Mat, Vec
from VeraGridEngine.Utils.Symbolic.symbolic import get_expression_vars


[docs] class BackEulerImplicitIntegrationMTI(BackEulerImplicitIntegration): """ Backward Euler variant with MTI inequality/mode checks. This solver keeps the same Newton/Jacobian structure as ``BackEulerImplicitIntegration`` and adds a post-Newton MTI feasibility gate based on inequality residuals ``G <= tol``. """ def __init__( self, problem: RmsProblemMTI, t0: float, t_end: float, h: float, max_iter: int, tolerance: float = 1e-7, inequality_tolerance: float = 1e-9, ): super().__init__( problem=problem, t0=t0, t_end=t_end, h=h, max_iter=max_iter, tolerance=tolerance, ) self.problem: RmsProblemMTI = problem self.inequality_tolerance = inequality_tolerance self.z: Mat = np.empty((self.steps + 1, 0), dtype=float) self.debug = os.getenv("RMS_MTI_DEBUG", "0").strip() in ("1", "true", "True", "yes", "on") self.debug_newton = os.getenv("RMS_MTI_DEBUG_NEWTON", "0").strip() in ("1", "true", "True", "yes", "on") self.debug_residuals = os.getenv("RMS_MTI_DEBUG_RESIDUALS", "0").strip() in ("1", "true", "True", "yes", "on") self.compare = os.getenv("RMS_MTI_COMPARE", "0").strip() in ("1", "true", "True", "yes", "on") try: # 0 means: no candidate cap (toolbox-like all-candidates check) self.debug_max_cands = int(os.getenv("RMS_MTI_DEBUG_MAX_CANDS", "0")) except Exception: self.debug_max_cands = 0 self.polish_candidates = os.getenv("RMS_MTI_POLISH_CANDIDATES", "0").strip() in ( "1", "true", "True", "yes", "on" ) self.use_nonlinear_subproblem_lsq = os.getenv("RMS_MTI_NONLINEAR_SUBPROBLEM_LSQ", "0").strip() in ( "1", "true", "True", "yes", "on" ) self._debug_x0_ref: Vec | None = None self._debug_first_eval_done = False self._last_subproblem_fail_reason = "solve_fail" self._last_subproblem_context = "" self.mti_newton_tol = max(self.tol, float(os.getenv("RMS_MTI_NEWTON_TOL", "1e-6"))) try: self.mti_solve_tol = self.mti_newton_tol * max(1.0, float(os.getenv("RMS_MTI_SOLVE_TOL_FACTOR", "200.0"))) except Exception: self.mti_solve_tol = 200.0 * self.mti_newton_tol try: default_mti_max_iter = max(int(self.max_iter_0), 300) self.mti_max_iter = max(default_mti_max_iter, int(os.getenv("RMS_MTI_MAX_ITER", str(default_mti_max_iter)))) except Exception: self.mti_max_iter = max(int(self.max_iter_0), 300) try: self.mti_lsqr_iter_lim = max(1, int(os.getenv("RMS_MTI_LSQR_ITER", "100"))) except Exception: self.mti_lsqr_iter_lim = 100 def _debug_print_top_residuals( self, rhs: Vec, top_k: int = 10, tag: str = "", x_eval: Vec | None = None, dx_eval: Vec | None = None, ) -> None: if top_k <= 0: return if not self.debug: return arr = np.asarray(rhs, dtype=float) if arr.size == 0: return n_state = int(self.problem.get_states_number()) x_bind = np.asarray(self.y[0], dtype=float) if x_eval is None else np.asarray(x_eval, dtype=float) dx_bind = np.zeros(self.problem.get_diff_var_number(), dtype=float) if dx_eval is None else np.asarray(dx_eval, dtype=float) state_eqs = getattr(self.problem, "_state_eqs", []) algeb_eqs = getattr(self.problem, "_algebraic_eqs", []) def _eq_text(i: int) -> str: try: if i < n_state: if i < len(state_eqs): return f"state_update[{i}] from f_state: {state_eqs[i]}" return f"state_update[{i}]" j = i - n_state if j < len(algeb_eqs): return f"algeb[{j}]: {algeb_eqs[j]}" return f"algeb[{j}]" except Exception: return "<eq unavailable>" order = np.argsort(np.abs(arr))[::-1] k = min(top_k, arr.size) print(f"[MTI-RHS] {tag} top {k} residual entries (idx: value):") for i in order[:k]: ii = int(i) print(f" {ii}: {arr[ii]:+.6e}") print(f" eq: {_eq_text(ii)}") # Binding sanity check: compare compiled residual entry vs direct # symbolic eval with current uid bindings. try: if ii >= n_state: j = ii - n_state if 0 <= j < len(algeb_eqs): eq = algeb_eqs[j] uid_bindings: dict[int, float] = {} for vr in get_expression_vars(eq): v_idx = self.problem.uid2idx_vars.get(vr.uid, None) if v_idx is not None: uid_bindings[vr.uid] = float(x_bind[v_idx]) continue d_idx = self.problem._uid2idx_diff.get(vr.uid, None) if d_idx is not None: uid_bindings[vr.uid] = float(dx_bind[d_idx]) continue p_idx = self.problem._uid2idx_params.get(vr.uid, None) if p_idx is not None: uid_bindings[vr.uid] = float(self.problem._constant_params[p_idx]) continue e_idx = self.problem._uid2idx_event_params.get(vr.uid, None) if e_idx is not None: uid_bindings[vr.uid] = float(self.problem._variable_parameters_values[e_idx]) sym_val = float(eq.eval_uid(uid_bindings)) print(f" compiled={arr[ii]:+.6e} symbolic={sym_val:+.6e} diff={(arr[ii]-sym_val):+.6e}") vars_preview = [] for vr in get_expression_vars(eq): v_idx = self.problem.uid2idx_vars.get(vr.uid, None) if v_idx is not None: vars_preview.append(f"{vr.name}=vars[{v_idx}]={x_bind[v_idx]:+.3e}") continue d_idx = self.problem._uid2idx_diff.get(vr.uid, None) if d_idx is not None: vars_preview.append(f"{vr.name}=diff[{d_idx}]={dx_bind[d_idx]:+.3e}") continue p_idx = self.problem._uid2idx_params.get(vr.uid, None) if p_idx is not None: vars_preview.append(f"{vr.name}=cprms[{p_idx}]={self.problem._constant_params[p_idx]:+.3e}") continue e_idx = self.problem._uid2idx_event_params.get(vr.uid, None) if e_idx is not None: vars_preview.append(f"{vr.name}=vprms[{e_idx}]={self.problem._variable_parameters_values[e_idx]:+.3e}") if len(vars_preview) > 0: print(" bindings: " + ", ".join(vars_preview[:12])) except Exception as ex: print(f" symbolic-check-failed: {ex}") def _newton_step(self, x_new: Vec, x_prev: Vec, dx: Vec, h_eff: float) -> tuple[Vec, float]: rhs = self.problem.compute_mti_equalities(x_new, dx, x_prev, h_eff) if rhs.size == 0: return x_new, 0.0 if not np.all(np.isfinite(rhs)): return x_new, np.inf residual = float(np.linalg.norm(rhs, np.inf)) if residual < self.tol: return x_new, residual jf = self._jacobian_implicit(x_new, dx, h_eff) try: with warnings.catch_warnings(): warnings.filterwarnings("error", category=MatrixRankWarning) delta = sp.linalg.spsolve(jf, -rhs) except (MatrixRankWarning, RuntimeError, ValueError): delta = np.full_like(x_new, np.nan, dtype=float) if not np.all(np.isfinite(delta)): delta, *_ = sp.linalg.lsqr(jf, -rhs, iter_lim=self.mti_lsqr_iter_lim, atol=self.mti_solve_tol, btol=self.mti_solve_tol) if not np.all(np.isfinite(delta)): return x_new, np.inf x_next = x_new + delta if not np.all(np.isfinite(x_next)): return x_new, np.inf return x_next, residual def _solve_continuous_for_fixed_boolean( self, x_seed: Vec, x_prev: Vec, dx_last: Vec, h_eff: float, force_zero_dx: bool = False, ) -> tuple[bool, Vec, Vec, float]: x_new = x_seed.copy() residual = np.inf dx_out = dx_last.copy() for it in range(int(self.mti_max_iter)): it_t0 = time.perf_counter() if force_zero_dx: dx = np.zeros_like(dx_last) else: dx = self.problem.get_dx(x_new, x_prev, dx_last, h_eff) if self.debug and not self._debug_first_eval_done and self._debug_x0_ref is not None: x0_ref = np.asarray(self._debug_x0_ref, dtype=float) xnew_arr = np.asarray(x_new, dtype=float) xprev_arr = np.asarray(x_prev, dtype=float) d_xnew_x0 = float(np.linalg.norm(xnew_arr - x0_ref, np.inf)) if xnew_arr.size == x0_ref.size else np.inf d_xprev_x0 = float(np.linalg.norm(xprev_arr - x0_ref, np.inf)) if xprev_arr.size == x0_ref.size else np.inf d_xnew_xprev = float(np.linalg.norm(xnew_arr - xprev_arr, np.inf)) if xnew_arr.size == xprev_arr.size else np.inf print( "[INIT-CHECK-STEP0] first compute_mti_equalities input " f"||x_new-x0||inf={d_xnew_x0:.6e} " f"||x_prev-x0||inf={d_xprev_x0:.6e} " f"||x_new-x_prev||inf={d_xnew_xprev:.6e}" ) self._debug_first_eval_done = True rhs = self.problem.compute_mti_equalities(x_new, dx, x_prev, h_eff) residual = float(np.linalg.norm(rhs, np.inf)) if rhs.size > 0 else 0.0 if self.debug_newton: it_ms = (time.perf_counter() - it_t0) * 1e3 print(f"[MTI-NEWTON] it={it + 1}/{self.mti_max_iter} res={residual:.6e} iter_ms={it_ms:.3f}") if self.debug_residuals and residual >= max(self.tol, 1e-6): self._debug_print_top_residuals(rhs, top_k=12, tag="fixed-z", x_eval=x_new, dx_eval=dx) if residual < self.mti_newton_tol: if self.debug_newton: print(f"[MTI-NEWTON] converged in {it + 1} iterations") return True, x_new, dx, residual x_new, _ = self._newton_step(x_new=x_new, x_prev=x_prev, dx=dx, h_eff=h_eff) if not np.all(np.isfinite(x_new)): return False, x_new, dx, np.inf dx_out = dx.copy() if self.debug_newton: print(f"[MTI-NEWTON] failed after {int(self.mti_max_iter)} iterations; last_res={residual:.6e}") dx = np.zeros_like(dx_last) if force_zero_dx else self.problem.get_dx(x_new, x_prev, dx_last, h_eff) rhs = self.problem.compute_mti_equalities(x_new, dx, x_prev, h_eff) if rhs.size > 0 and np.all(np.isfinite(rhs)): residual = float(np.linalg.norm(rhs, np.inf)) if residual < self.mti_newton_tol: return True, x_new, dx, residual dx_out = dx.copy() if self.debug: print(f"[MTI-NEWTON] fixed-boolean failed final_residual={residual:.6e}") return False, x_new, dx_out, residual def _solve_continuous_with_fallback( self, x_prev: Vec, dx_last: Vec, h_eff: float, force_zero_dx: bool = False, ) -> tuple[bool, Vec, Vec, float]: seeds = [x_prev.copy()] best = (False, x_prev.copy(), dx_last.copy(), np.inf) for seed in seeds: ok, x_try, dx_try, res = self._solve_continuous_for_fixed_boolean( seed, x_prev, dx_last, h_eff, force_zero_dx=force_zero_dx, ) if ok: return True, x_try, dx_try, res if res < best[3]: best = (ok, x_try, dx_try, res) return best def _detect_inequality_event( self, g_end: Vec, dg_end: Vec, ) -> tuple[bool, int]: g_arr = np.asarray(g_end, dtype=float) if g_arr.size == 0 or not np.all(np.isfinite(g_arr)): return False, -1 violated = np.where(g_arr > self.inequality_tolerance)[0] if violated.size > 0: idx = int(violated[np.argmax(g_arr[violated])]) return True, idx dg_arr = np.asarray(dg_end, dtype=float) if dg_arr.size == g_arr.size and np.all(np.isfinite(dg_arr)): active = np.where(g_arr >= -1e-9)[0] outgoing = active[dg_arr[active] > 0.0] if active.size > 0 else np.zeros(0, dtype=int) if outgoing.size > 0: idx = int(outgoing[np.argmax(dg_arr[outgoing])]) return True, idx return False, -1 def _locate_inequality_event( self, x_prev: Vec, dx_last: Vec, h_eff: float, z_prev: Vec, event_idx: int, x_end: Vec, dx_end: Vec, force_zero_dx: bool = False, ) -> tuple[bool, float, Vec, Vec, int]: """ Locate the first fixed-z inequality crossing inside a BackEuler segment. This approximates MATLAB's ODE event localization for the implicit Euler path: solve the fixed-z DAE at intermediate step lengths and bisect the scalar event function G[event_idx]. Boolean candidates are deliberately not generated here; this only finds the event time that will trigger MTI event handling. """ if event_idx < 0: return False, h_eff, x_end, dx_end, event_idx self.problem.set_mti_boolean_state(z_prev) g0 = self.problem.compute_mti_inequalities(x_prev, dx_last, x_prev, 0.0) g0_arr = np.asarray(g0, dtype=float) if event_idx >= g0_arr.size: return False, h_eff, x_end, dx_end, event_idx g0_val = float(g0_arr[event_idx]) if g0_val >= -self.inequality_tolerance: return True, 0.0, np.asarray(x_prev, dtype=float).copy(), np.asarray(dx_last, dtype=float).copy(), event_idx g1 = self.problem.compute_mti_inequalities(x_end, dx_end, x_prev, h_eff) g1_arr = np.asarray(g1, dtype=float) if event_idx >= g1_arr.size: return False, h_eff, x_end, dx_end, event_idx if float(g1_arr[event_idx]) < -self.inequality_tolerance: return True, h_eff, np.asarray(x_end, dtype=float).copy(), np.asarray(dx_end, dtype=float).copy(), event_idx try: n_iter = max(1, int(os.getenv("RMS_MTI_EVENT_BISECT_ITER", "20"))) except Exception: n_iter = 20 lo = 0.0 hi = float(h_eff) x_hi = np.asarray(x_end, dtype=float).copy() dx_hi = np.asarray(dx_end, dtype=float).copy() for _ in range(n_iter): hm = 0.5 * (lo + hi) if hm <= 0.0: break self.problem.set_mti_boolean_state(z_prev) ok_mid, x_mid, dx_mid, _ = self._solve_continuous_with_fallback( x_prev=x_prev, dx_last=dx_last, h_eff=hm, force_zero_dx=force_zero_dx, ) if not ok_mid: hi = hm continue g_mid = self.problem.compute_mti_inequalities(x_mid, dx_mid, x_prev, hm) g_mid_arr = np.asarray(g_mid, dtype=float) if event_idx >= g_mid_arr.size: return False, h_eff, x_end, dx_end, event_idx if float(g_mid_arr[event_idx]) >= -self.inequality_tolerance: hi = hm x_hi = np.asarray(x_mid, dtype=float).copy() dx_hi = np.asarray(dx_mid, dtype=float).copy() else: lo = hm if self.debug: print( f"[MTI-EVENT-LOC] ineq={event_idx} h={h_eff:.6e} " f"h_event={hi:.6e}" ) return True, hi, x_hi, dx_hi, event_idx def _event_total_derivative( self, x_prev: Vec, dx_last: Vec, h_eff: float, z_prev: Vec, force_zero_dx: bool, forced_ineq_idx: int | None = None, ) -> tuple[bool, Vec, Vec, Vec, float, Vec, Vec]: """ MATLAB eventTotalDerivative-style event handler. Flow mirrors the toolbox control structure: previous fixed-Z subproblems -> event candidate-Z subproblem -> following fixed/variable-Z subproblems -> G/DG acceptance. """ g_now = self.problem.compute_mti_inequalities(x_prev, dx_last, x_prev, h_eff) if forced_ineq_idx is not None: ie = int(forced_ineq_idx) elif g_now is not None and len(g_now) > 0: g_now_arr = np.asarray(g_now, dtype=float) active_idx = np.where(g_now_arr >= -self.inequality_tolerance)[0] if active_idx.size > 0: ie = int(active_idx[int(np.argmax(g_now_arr[active_idx]))]) else: ie = int(np.argmax(g_now_arr)) else: ie = 0 candidates = self.problem.get_event_local_boolean_candidates(ie, z_prev) candidates.sort(key=lambda zc: self._hamming_distance(np.asarray(zc, dtype=float), z_prev)) if self.debug: n_bool = len(z_prev) full_count = (2 ** n_bool) if n_bool > 0 else 1 n_cands = len(candidates) n_local_bools = -1 if n_cands > 0 and (n_cands & (n_cands - 1)) == 0: n_local_bools = int(np.log2(n_cands)) scope = "full" if n_cands == full_count else "local" print( f"[MTI-CAND] source={scope} ie={ie} n_cands={n_cands} " f"n_local_bools={n_local_bools} n_total_bools={n_bool}" ) event_subset_ids = self.problem.get_event_subset_ids(ie) prev_groups, event_groups, foll_groups = self.problem.get_event_solving_stages(ie) x_stage = x_prev.copy() dx_stage = dx_last.copy() for eqg, varg in prev_groups: okg, x_stage, dx_stage, _ = self._solve_subproblem_fix_z( x_seed=x_stage, x_prev=x_prev, dx_last=dx_stage, h_eff=h_eff, eq_idx=np.asarray(eqg, dtype=int), var_idx=np.asarray(varg, dtype=int), force_zero_dx=force_zero_dx, ) if not okg: break best_fail_res = np.inf best_fail_x = x_prev.copy() best_fail_dx = dx_last.copy() first_fail_dumped = False def try_candidates(cands: list[np.ndarray]) -> tuple[bool, Vec, Vec, Vec]: nonlocal best_fail_res, best_fail_x, best_fail_dx, first_fail_dumped max_iter_cands = len(cands) if self.debug and self.debug_max_cands > 0: max_iter_cands = min(max_iter_cands, int(self.debug_max_cands)) for cand_idx, z_candidate in enumerate(cands): if cand_idx >= max_iter_cands: if self.debug: print(f"[MTI-CAND] debug_limit_reached={max_iter_cands}") break self.problem.set_mti_boolean_state(z_candidate) x_try = x_stage.copy() dx_try = dx_stage.copy() ok = True res_try = np.inf for group_i, (eqg, varg) in enumerate(event_groups): ok, x_try, dx_try, res_try = self._solve_subproblem_fix_z( x_seed=x_try, x_prev=x_prev, dx_last=dx_try, h_eff=h_eff, eq_idx=np.asarray(eqg, dtype=int), var_idx=np.asarray(varg, dtype=int), force_zero_dx=force_zero_dx, check_inequalities=False, ) if not ok: if self.debug: print( f"[MTI-SUBFAIL] stage=event group={group_i} " f"rows={len(np.asarray(eqg, dtype=int))} vars={len(np.asarray(varg, dtype=int))} " f"reason={getattr(self, '_last_subproblem_fail_reason', 'solve_fail')} res={res_try:.6e}" ) break if ok: z_follow = np.asarray(z_candidate, dtype=float).copy() for group_i, (eqg, varg) in enumerate(foll_groups): eq_arr = np.asarray(eqg, dtype=int) var_arr = np.asarray(varg, dtype=int) bool_positions = self.problem.get_subproblem_boolean_positions(eq_arr, var_arr) if len(bool_positions) > 0: ok, x_try, dx_try, z_follow, res_try = self._solve_subproblem_variable_z( x_seed=x_try, x_prev=x_prev, dx_last=dx_try, h_eff=h_eff, z_seed=z_follow, eq_idx=eq_arr, var_idx=var_arr, force_zero_dx=force_zero_dx, ) else: self.problem.set_mti_boolean_state(z_follow) ok, x_try, dx_try, res_try = self._solve_subproblem_fix_z( x_seed=x_try, x_prev=x_prev, dx_last=dx_try, h_eff=h_eff, eq_idx=eq_arr, var_idx=var_arr, force_zero_dx=force_zero_dx, ) if not ok: if self.debug: print( f"[MTI-SUBFAIL] stage=following group={group_i} " f"rows={eq_arr.size} vars={var_arr.size} local_bools={len(bool_positions)} " f"reason={getattr(self, '_last_subproblem_fail_reason', 'solve_fail')} res={res_try:.6e}" ) break if ok: self.problem.set_mti_boolean_state(z_follow) z_candidate = z_follow if ok and self.polish_candidates: ok, x_try, dx_try, res_try = self._solve_continuous_with_fallback( x_prev=x_try, dx_last=dx_try, h_eff=h_eff, force_zero_dx=force_zero_dx, ) elif ok: rhs_try = self.problem.compute_mti_equalities(x_try, dx_try, x_prev, h_eff) res_try = float(np.linalg.norm(rhs_try, np.inf)) if rhs_try.size > 0 else 0.0 if not ok: if self.debug: fail_reason = getattr(self, "_last_subproblem_fail_reason", "solve_fail") print( f"[MTI-CAND] idx={cand_idx} z={np.asarray(z_candidate, dtype=int)} " f"decision={fail_reason} res={res_try:.3e}" ) if not first_fail_dumped: first_fail_dumped = True rhs_dbg = self.problem.compute_mti_equalities(x_try, dx_try, x_prev, h_eff) if self.debug_residuals: self._debug_print_top_residuals(rhs_dbg, top_k=12, tag="first-candidate-fail", x_eval=x_try, dx_eval=dx_try) if np.isfinite(res_try) and res_try < best_fail_res: best_fail_res = res_try best_fail_x = x_try best_fail_dx = dx_try continue g_try = self.problem.compute_mti_inequalities(x_try, dx_try, x_prev, h_eff) if not self.problem.inequalities_satisfied(g_try, tol=self.inequality_tolerance): if self.debug: g_arr = np.asarray(g_try, dtype=float) g_max = float(np.max(g_arr)) if g_arr.size > 0 else -np.inf g_idx = int(np.argmax(g_arr)) if g_arr.size > 0 else -1 ineq_txt = "<unavailable>" if g_idx >= 0: ineq_list = getattr(self.problem, "_mti_inequalities_raw", None) if ineq_list is None or len(ineq_list) == 0: ineq_list = getattr(self.problem, "_mti_inequalities_compiled", None) try: if ineq_list is not None and g_idx < len(ineq_list): ineq_txt = str(ineq_list[g_idx]) except Exception: ineq_txt = "<stringify-failed>" print( f"[MTI-CAND] idx={cand_idx} z={np.asarray(z_candidate, dtype=int)} " f"decision=ineq_fail maxG={g_max:.3e} at ineq[{g_idx}]" ) if g_idx >= 0: print(f" ineq[{g_idx}] expr: {ineq_txt}") continue h_safe = max(float(h_eff), 1e-12) xpp_try = (np.asarray(dx_try, dtype=float) - np.asarray(dx_last, dtype=float)) / h_safe dg_try = self.problem.total_derivative_inequalities(x_try, dx_try, xpp=xpp_try) active = np.asarray(g_try) >= -1e-9 if dg_try.size > 0 and np.any(active) and np.any(np.asarray(dg_try)[active] > 0.0): if self.debug: dg_active_max = float(np.max(np.asarray(dg_try, dtype=float)[active])) print( f"[MTI-CAND] idx={cand_idx} z={np.asarray(z_candidate, dtype=int)} " f"decision=dg_fail maxDGactive={dg_active_max:.3e}" ) continue if self.debug: g_arr = np.asarray(g_try, dtype=float) g_max = float(np.max(g_arr)) if g_arr.size > 0 else -np.inf dg_active_max = float(np.max(np.asarray(dg_try, dtype=float)[active])) if (dg_try.size > 0 and np.any(active)) else -np.inf print( f"[MTI-CAND] idx={cand_idx} z={np.asarray(z_candidate, dtype=int)} " f"decision=accepted maxG={g_max:.3e} maxDGactive={dg_active_max:.3e}" ) return True, x_try, dx_try, np.asarray(z_candidate, dtype=float) return False, best_fail_x, best_fail_dx, z_prev ok_evt, x_evt, dx_evt, z_evt = try_candidates(candidates) if ok_evt: return True, x_evt, dx_evt, z_evt, best_fail_res, best_fail_x, best_fail_dx if self.debug: print("[MTI-CAND] no local candidate accepted; skipping full boolean enumeration") return False, x_prev, dx_last, z_prev, best_fail_res, best_fail_x, best_fail_dx def _try_event_total_derivative_candidates( self, x_prev: Vec, dx_last: Vec, h_eff: float, z_prev: Vec, force_zero_dx: bool, ) -> tuple[bool, Vec, Vec, Vec, float, Vec, Vec]: return self._event_total_derivative( x_prev=x_prev, dx_last=dx_last, h_eff=h_eff, z_prev=z_prev, force_zero_dx=force_zero_dx, ) def _solve_plain_dae_for_seed( self, x_seed: Vec, x_prev: Vec, dx_last: Vec, h_eff: float, force_zero_dx: bool = False, ) -> tuple[bool, Vec, Vec, float]: x_new = x_seed.copy() residual = np.inf dx_out = dx_last.copy() for _ in range(self.mti_max_iter): if force_zero_dx: dx = np.zeros_like(dx_last) else: dx = self.problem.get_dx(x_new, x_prev, dx_last, h_eff) rhs = self._rhs_implicit(x_new, dx, x_prev, h_eff) residual = float(np.linalg.norm(rhs, np.inf)) if residual <= self.mti_solve_tol: return True, x_new, dx, residual jf = self._jacobian_implicit(x_new, dx, h_eff) m, n = jf.shape if rhs.size != m: raise ValueError( f"Dimension mismatch: jf has shape {jf.shape}, " f"but rhs has length {rhs.size}. " f"Need len(rhs) == jf.shape[0]." ) if m == n: # Standard square sparse solve delta = sp.linalg.spsolve(jf, -rhs) else: # Rectangular system: solve least-squares min ||jf @ delta + rhs|| delta = sp.linalg.lsqr(jf, -rhs, iter_lim=self.mti_lsqr_iter_lim, atol=self.mti_solve_tol, btol=self.mti_solve_tol)[0] if not np.all(np.isfinite(delta)): delta, *_ = sp.linalg.lsqr(jf, -rhs, iter_lim=self.mti_lsqr_iter_lim, atol=self.mti_solve_tol, btol=self.mti_solve_tol) if not np.all(np.isfinite(delta)): if self.debug: print("[MTI] plain DAE linear solve returned non-finite delta") return False, x_new, dx, np.inf x_new = x_new + delta if not np.all(np.isfinite(x_new)): if self.debug: print("[MTI] plain DAE update produced non-finite x") return False, x_new, dx, np.inf dx_out = dx.copy() dx = np.zeros_like(dx_last) if force_zero_dx else self.problem.get_dx(x_new, x_prev, dx_last, h_eff) rhs_full = self.problem.compute_mti_equalities(x_new, dx, x_prev, h_eff) if rhs_full.size > 0 and np.all(np.isfinite(rhs_full)): rhs = np.asarray(rhs_full, dtype=float) residual = float(np.linalg.norm(rhs, np.inf)) if rhs.size > 0 else 0.0 if residual <= self.mti_solve_tol: return True, x_new, dx, residual dx_out = dx.copy() return False, x_new, dx_out, residual def _solve_plain_dae( self, x_prev: Vec, dx_last: Vec, h_eff: float, force_zero_dx: bool = False, ) -> tuple[bool, Vec, Vec, float]: # Keep behavior aligned with BackEulerImplicitIntegration: # start Newton from previous converged point only. return self._solve_plain_dae_for_seed( x_prev.copy(), x_prev, dx_last, h_eff, force_zero_dx=force_zero_dx, ) def _solve_subproblem_fix_z( self, x_seed: Vec, x_prev: Vec, dx_last: Vec, h_eff: float, eq_idx: np.ndarray, var_idx: np.ndarray, force_zero_dx: bool = False, check_inequalities: bool = False, ) -> tuple[bool, Vec, Vec, float]: self._last_subproblem_fail_reason = "solve_fail" self._last_subproblem_context = "fix-z" raw_eq_idx = np.asarray(eq_idx, dtype=int) ineq_idx = self.problem.get_inequality_row_indices(raw_eq_idx) fixed_bool_eq_idx = self.problem.get_fixed_boolean_equality_row_indices(raw_eq_idx) eq_idx = self.problem.get_continuous_equality_row_indices(raw_eq_idx) var_idx = np.asarray(var_idx, dtype=int) if self.debug: print( f"[MTI-SUB] kind=fix-z raw_rows={raw_eq_idx.size} eq_rows={eq_idx.size} " f"fixed_bool_eq={fixed_bool_eq_idx.size} ineq_rows={ineq_idx.size} " f"vars={var_idx.size} checkG={int(check_inequalities)}" ) if eq_idx.size == 0 or var_idx.size == 0: dx = self.problem.get_dx(x_seed, x_prev, dx_last, h_eff) if not force_zero_dx else np.zeros_like(dx_last) if check_inequalities: g_max = self._subproblem_inequality_violation(x_seed, dx, x_prev, h_eff, ineq_idx) if g_max > self.inequality_tolerance: self._last_subproblem_fail_reason = "ineq_fail" return False, x_seed.copy(), dx, g_max bool_res = self._fixed_boolean_equality_residual(x_seed, dx, x_prev, h_eff, fixed_bool_eq_idx) if bool_res > self.mti_solve_tol: self._last_subproblem_fail_reason = "bool_eq_fail" return False, x_seed.copy(), dx, bool_res return True, x_seed.copy(), dx, 0.0 x_new = x_seed.copy() dx_out = dx_last.copy() residual = np.inf explicit_pairs, eq_idx, var_idx = self.problem.split_explicit_subproblem_pairs(eq_idx=eq_idx, var_idx=var_idx) if self.debug: print( f"[MTI-SUBSPLIT] explicit_pairs={len(explicit_pairs)} " f"rest_eq={eq_idx.size} rest_var={var_idx.size}" ) for eq0, var0 in explicit_pairs: for it in range(self.mti_max_iter): dx = np.zeros_like(dx_last) if force_zero_dx else self.problem.get_dx(x_new, x_prev, dx_last, h_eff) rhs_full = self.problem.compute_mti_equalities(x_new, dx, x_prev, h_eff) if rhs_full.size == 0 or not np.all(np.isfinite(rhs_full)): return False, x_new, dx, np.inf rhs = np.asarray([float(rhs_full[int(eq0)])], dtype=float) residual = abs(float(rhs[0])) if self.debug: g_max = self._subproblem_inequality_violation(x_new, dx, x_prev, h_eff, ineq_idx) print( f"[MTI-SUBERR] phase=explicit it={it + 1}/{self.mti_max_iter} " f"eq=1 var=1 res={residual:.6e} localG={g_max:.6e}" ) if residual <= self.mti_solve_tol: dx_out = dx.copy() break j_full = self._jacobian_implicit(x_new, dx, h_eff).tocsr() deriv = float(j_full[int(eq0), int(var0)]) if abs(deriv) <= 1e-14 or not np.isfinite(deriv): delta_sub, *_ = sp.linalg.lsqr(j_full[[int(eq0)], :][:, [int(var0)]], -rhs, iter_lim=self.mti_lsqr_iter_lim, atol=self.mti_solve_tol, btol=self.mti_solve_tol) delta = float(delta_sub[0]) if len(delta_sub) > 0 else np.nan else: delta = -float(rhs[0]) / deriv if not np.isfinite(delta): return False, x_new, dx, np.inf x_new[int(var0)] += delta if not np.all(np.isfinite(x_new)): return False, x_new, dx, np.inf dx_out = dx.copy() else: return False, x_new, dx_out, residual if eq_idx.size == 0 or var_idx.size == 0: dx = self.problem.get_dx(x_new, x_prev, dx_last, h_eff) if not force_zero_dx else np.zeros_like(dx_last) rhs_full = self.problem.compute_mti_equalities(x_new, dx, x_prev, h_eff) if rhs_full.size > 0 and np.all(np.isfinite(rhs_full)): eq_eval = self.problem.get_equality_row_indices(raw_eq_idx) rhs = np.asarray(rhs_full, dtype=float)[eq_eval] if eq_eval.size > 0 else np.zeros(0, dtype=float) residual = float(np.linalg.norm(rhs, np.inf)) if rhs.size > 0 else 0.0 if self.debug: g_max = self._subproblem_inequality_violation(x_new, dx, x_prev, h_eff, ineq_idx) print( f"[MTI-SUBERR] phase=early-return eq={eq_idx.size} var={var_idx.size} " f"res={residual:.6e} localG={g_max:.6e}" ) if check_inequalities: g_max = self._subproblem_inequality_violation(x_new, dx, x_prev, h_eff, ineq_idx) if g_max > self.inequality_tolerance: self._last_subproblem_fail_reason = "ineq_fail" return False, x_new, dx, max(residual if np.isfinite(residual) else 0.0, g_max) bool_res = self._fixed_boolean_equality_residual(x_new, dx, x_prev, h_eff, fixed_bool_eq_idx) if bool_res > self.mti_solve_tol: self._last_subproblem_fail_reason = "bool_eq_fail" return False, x_new, dx, max(residual if np.isfinite(residual) else 0.0, bool_res) return True, x_new, dx, residual if np.isfinite(residual) else 0.0 if self.use_nonlinear_subproblem_lsq and eq_idx.size != var_idx.size: ok_lsq, x_lsq, dx_lsq, res_lsq = self._solve_subproblem_nonlinear_lsq( x_seed=x_new, x_prev=x_prev, dx_last=dx_last, h_eff=h_eff, eq_idx=eq_idx, var_idx=var_idx, ineq_idx=ineq_idx, force_zero_dx=force_zero_dx, check_inequalities=check_inequalities, ) if ok_lsq or np.isfinite(res_lsq): return ok_lsq, x_lsq, dx_lsq, res_lsq for it in range(self.mti_max_iter): dx = np.zeros_like(dx_last) if force_zero_dx else self.problem.get_dx(x_new, x_prev, dx_last, h_eff) rhs_full = self.problem.compute_mti_equalities(x_new, dx, x_prev, h_eff) if rhs_full.size == 0 or not np.all(np.isfinite(rhs_full)): return False, x_new, dx, np.inf rhs = np.asarray(rhs_full, dtype=float)[eq_idx] residual = float(np.linalg.norm(rhs, np.inf)) if rhs.size > 0 else 0.0 g_max_dbg = self._subproblem_inequality_violation(x_new, dx, x_prev, h_eff, ineq_idx) print( f"[MTI-SUBERR] phase=implicit it={it + 1}/{self.mti_max_iter} " f"eq={eq_idx.size} var={var_idx.size} res={residual:.6e} localG={g_max_dbg:.6e}" ) if residual <= self.mti_solve_tol: bool_res = self._fixed_boolean_equality_residual(x_new, dx, x_prev, h_eff, fixed_bool_eq_idx) if bool_res > self.mti_solve_tol: self._last_subproblem_fail_reason = "bool_eq_fail" return False, x_new, dx, max(residual, bool_res) if check_inequalities: g_max = self._subproblem_inequality_violation(x_new, dx, x_prev, h_eff, ineq_idx) if g_max > self.inequality_tolerance: self._last_subproblem_fail_reason = "ineq_fail" return False, x_new, dx, max(residual, g_max) return True, x_new, dx, residual j_full = self._jacobian_implicit(x_new, dx, h_eff).tocsr() j_sub = j_full[eq_idx, :][:, var_idx] try: with warnings.catch_warnings(): warnings.filterwarnings("error", category=MatrixRankWarning) delta_sub = sp.linalg.spsolve(j_sub, -rhs) except Exception: delta_sub, *_ = sp.linalg.lsqr(j_sub, -rhs, iter_lim=self.mti_lsqr_iter_lim, atol=self.mti_solve_tol, btol=self.mti_solve_tol) if not np.all(np.isfinite(delta_sub)): return False, x_new, dx, np.inf x_trial = x_new.copy() x_trial[var_idx] = x_trial[var_idx] + np.asarray(delta_sub, dtype=float) if not np.all(np.isfinite(x_trial)): return False, x_new, dx, np.inf x_new = x_trial dx_out = dx.copy() print(f"[MTI-SUBFAIL] kind=fix-z final_residual={residual:.6e}") return False, x_new, dx_out, residual def _solve_subproblem_nonlinear_lsq( self, x_seed: Vec, x_prev: Vec, dx_last: Vec, h_eff: float, eq_idx: np.ndarray, var_idx: np.ndarray, ineq_idx: np.ndarray, force_zero_dx: bool, check_inequalities: bool, ) -> tuple[bool, Vec, Vec, float]: eq_idx = np.asarray(eq_idx, dtype=int) var_idx = np.asarray(var_idx, dtype=int) if eq_idx.size == 0 or var_idx.size == 0: dx = self.problem.get_dx(x_seed, x_prev, dx_last, h_eff) if not force_zero_dx else np.zeros_like(dx_last) return True, x_seed.copy(), dx, 0.0 x0_sub = np.asarray(x_seed[var_idx], dtype=float) def residual_fn(x_sub: Vec) -> Vec: x_eval = x_seed.copy() x_eval[var_idx] = np.asarray(x_sub, dtype=float) dx_eval = np.zeros_like(dx_last) if force_zero_dx else self.problem.get_dx(x_eval, x_prev, dx_last, h_eff) rhs_full = self.problem.compute_mti_equalities(x_eval, dx_eval, x_prev, h_eff) if rhs_full.size == 0 or not np.all(np.isfinite(rhs_full)): return np.full(eq_idx.size, 1e20, dtype=float) rhs = np.asarray(rhs_full, dtype=float)[eq_idx] if not np.all(np.isfinite(rhs)): return np.full(eq_idx.size, 1e20, dtype=float) return rhs def jac_fn(x_sub: Vec): x_eval = x_seed.copy() x_eval[var_idx] = np.asarray(x_sub, dtype=float) dx_eval = np.zeros_like(dx_last) if force_zero_dx else self.problem.get_dx(x_eval, x_prev, dx_last, h_eff) return self._jacobian_implicit(x_eval, dx_eval, h_eff).tocsr()[eq_idx, :][:, var_idx] try: max_nfev = max(int(var_idx.size) * 1000, 4000) result = least_squares( residual_fn, x0_sub, jac=jac_fn, method="trf", ftol=np.finfo(float).eps, xtol=np.sqrt(np.finfo(float).eps), gtol=np.sqrt(np.finfo(float).eps), max_nfev=max_nfev, ) except Exception: return False, x_seed.copy(), dx_last.copy(), np.inf x_new = x_seed.copy() x_new[var_idx] = np.asarray(result.x, dtype=float) dx = np.zeros_like(dx_last) if force_zero_dx else self.problem.get_dx(x_new, x_prev, dx_last, h_eff) rhs = residual_fn(result.x) residual = float(np.linalg.norm(rhs, np.inf)) if rhs.size > 0 else 0.0 if not np.all(np.isfinite(x_new)) or not np.all(np.isfinite(dx)) or not np.isfinite(residual): return False, x_new, dx, np.inf if residual <= self.mti_solve_tol: if check_inequalities: g_max = self._subproblem_inequality_violation(x_new, dx, x_prev, h_eff, ineq_idx) if g_max > self.inequality_tolerance: return False, x_new, dx, max(residual, g_max) return True, x_new, dx, residual return False, x_new, dx, residual def _subproblem_inequality_violation( self, x: Vec, dx: Vec, x_prev: Vec, h_eff: float, ineq_idx: np.ndarray, ) -> float: ineq_idx = np.asarray(ineq_idx, dtype=int) if ineq_idx.size == 0: return -np.inf g = self.problem.compute_mti_inequalities(x, dx, x_prev, h_eff) if g is None or len(g) == 0: return -np.inf g_arr = np.asarray(g, dtype=float) valid = ineq_idx[(ineq_idx >= 0) & (ineq_idx < g_arr.size)] if valid.size == 0: return -np.inf return float(np.max(g_arr[valid])) def _fixed_boolean_equality_residual( self, x: Vec, dx: Vec, x_prev: Vec, h_eff: float, eq_idx: np.ndarray, ) -> float: eq_idx = np.asarray(eq_idx, dtype=int) if eq_idx.size == 0: return 0.0 rhs_full = self.problem.compute_mti_equalities(x, dx, x_prev, h_eff) if rhs_full.size == 0 or not np.all(np.isfinite(rhs_full)): return np.inf valid = eq_idx[(eq_idx >= 0) & (eq_idx < rhs_full.size)] if valid.size == 0: return 0.0 return float(np.linalg.norm(np.asarray(rhs_full, dtype=float)[valid], np.inf)) def _solve_subproblem_variable_z( self, x_seed: Vec, x_prev: Vec, dx_last: Vec, h_eff: float, z_seed: Vec, eq_idx: np.ndarray, var_idx: np.ndarray, force_zero_dx: bool = False, ) -> tuple[bool, Vec, Vec, Vec, float]: self._last_subproblem_context = "variable-z" bool_positions = self.problem.get_subproblem_boolean_positions(eq_idx=eq_idx, var_idx=var_idx) if self.debug: eq_rows = self.problem.get_equality_row_indices(np.asarray(eq_idx, dtype=int)) ineq_rows = self.problem.get_inequality_row_indices(np.asarray(eq_idx, dtype=int)) print( f"[MTI-SUB] kind=variable-z raw_rows={len(np.asarray(eq_idx, dtype=int))} " f"eq_rows={eq_rows.size} ineq_rows={ineq_rows.size} vars={len(np.asarray(var_idx, dtype=int))} " f"local_bools={len(bool_positions)}" ) if len(bool_positions) == 0: ok, x_try, dx_try, res = self._solve_subproblem_fix_z( x_seed=x_seed, x_prev=x_prev, dx_last=dx_last, h_eff=h_eff, eq_idx=eq_idx, var_idx=var_idx, force_zero_dx=force_zero_dx, check_inequalities=True, ) return ok, x_try, dx_try, np.asarray(z_seed, dtype=float).copy(), res z0 = np.asarray(z_seed, dtype=float).copy() combinations = [] for bits in product((0.0, 1.0), repeat=len(bool_positions)): z = z0.copy() for k, pos in enumerate(bool_positions): z[int(pos)] = float(bits[k]) combinations.append(z) combinations.sort(key=lambda zc: self._hamming_distance(zc, z0)) best = (False, x_seed.copy(), dx_last.copy(), z0.copy(), np.inf) local_ineq_idx = self.problem.get_inequality_row_indices(np.asarray(eq_idx, dtype=int)) for z_candidate in combinations: self._last_subproblem_fail_reason = "solve_fail" self.problem.set_mti_boolean_state(z_candidate) ok, x_try, dx_try, res = self._solve_subproblem_fix_z( x_seed=x_seed, x_prev=x_prev, dx_last=dx_last, h_eff=h_eff, eq_idx=eq_idx, var_idx=var_idx, force_zero_dx=force_zero_dx, check_inequalities=False, ) if not ok: if np.isfinite(res) and res < best[4]: best = (False, x_try, dx_try, z_candidate.copy(), res) continue g_max = self._subproblem_inequality_violation(x_try, dx_try, x_prev, h_eff, local_ineq_idx) if g_max <= max(self.inequality_tolerance, 2.0 * np.finfo(float).eps): return True, x_try, dx_try, z_candidate.copy(), res self._last_subproblem_fail_reason = "ineq_fail" if np.isfinite(res) and (res + max(0.0, g_max)) < best[4]: best = (False, x_try, dx_try, z_candidate.copy(), res + max(0.0, g_max)) return best @staticmethod def _hamming_distance(a: Vec, b: Vec) -> int: return int(np.sum(np.asarray(a, dtype=int) != np.asarray(b, dtype=int))) def _enumerate_boolean_candidates(self, z_prev: Vec) -> list[np.ndarray]: n_bool = len(z_prev) if n_bool == 0: return [np.zeros(0, dtype=float)] if n_bool == 1: return [np.array([0.0]), np.array([1.0])] candidates = [np.asarray(bits, dtype=float) for bits in product((0.0, 1.0), repeat=n_bool)] candidates.sort(key=lambda zc: self._hamming_distance(zc, z_prev)) return candidates @staticmethod def _merge_boolean_vectors(base_z: Vec, positions: list[int], values: Vec) -> np.ndarray: z = np.asarray(base_z, dtype=float).copy() for k, pos in enumerate(positions): z[pos] = float(values[k]) return z @staticmethod def _to_binary_boolean_state(z: Vec) -> np.ndarray: z_arr = np.asarray(z, dtype=float) if z_arr.size == 0: return z_arr.copy() return (z_arr >= 0.5).astype(float) def _direct_boolean_test_single(self, x_prev: Vec, dx_last: Vec) -> np.ndarray: z0 = np.array([0.0], dtype=float) self.problem.set_mti_boolean_state(z0) guard0 = self.problem.evaluate_boolean_guard(bool_position=0, x=x_prev, dx=dx_last) if guard0 is not None: if guard0 <= self.inequality_tolerance: return np.array([1.0], dtype=float) return z0 g0 = self.problem.compute_mti_inequalities(x_prev, dx_last, x_prev, self.h) if g0.size == 0 or float(np.max(g0)) <= self.inequality_tolerance: return np.array([1.0], dtype=float) return z0 def _resolve_direct_booleans(self, z_prev: Vec, x_prev: Vec, dx_last: Vec, direct_positions: list[int]) -> np.ndarray: z = np.asarray(z_prev, dtype=float).copy() for pos in direct_positions: g0 = None z_try0 = z.copy() z_try0[pos] = 0.0 self.problem.set_mti_boolean_state(z_try0) guard0 = self.problem.evaluate_boolean_guard(bool_position=pos, x=x_prev, dx=dx_last) if guard0 is None: g0 = self.problem.compute_mti_inequalities(x_prev, dx_last, x_prev, self.h) if guard0 is not None: if guard0 <= self.inequality_tolerance: z[pos] = 1.0 else: z[pos] = 0.0 else: if g0.size == 0 or float(np.max(g0)) <= self.inequality_tolerance: z[pos] = 1.0 else: z[pos] = 0.0 return z
[docs] def simulate(self): converged = False well_initialized = True x0: Vec = self.problem.get_x0() dx0: Vec = np.zeros(self.problem.get_diff_var_number(), dtype=float) self._debug_x0_ref = x0.copy() self._debug_first_eval_done = False self.t[0] = self.t0 self.y[0, :] = x0.copy() # Initialization quality is assessed from algebraic consistency at t0, # not by whether the first dynamic integration step converges. try: f0_alg = np.asarray(self.problem.rhs_algebraic(x0, dx0), dtype=float) if f0_alg.size > 0: r0 = float(np.linalg.norm(f0_alg, np.inf)) well_initialized = bool(np.isfinite(r0) and r0 <= max(self.tol, 1e-6)) except Exception: well_initialized = False # Re-apply MTI boolean initialization at simulation start to avoid # later parameter refreshes collapsing boolean mode values to defaults. try: self.problem._initialize_mti_booleans_at_t0() except Exception: pass n_bool_from_problem = len(self.problem.get_mti_boolean_parameter_indices) z0 = np.zeros(n_bool_from_problem, dtype=float) if n_bool_from_problem > 0 and self.problem._variable_parameters_values is not None: for k, idx in enumerate(self.problem.get_mti_boolean_parameter_indices): z0[k] = float(self.problem._variable_parameters_values[idx]) elif n_bool_from_problem > 0: z0 = self.problem.update_mti_boolean_state(x0, dx0, x0, self.h) if z0 is None: z0 = np.zeros(n_bool_from_problem, dtype=float) z0 = self._to_binary_boolean_state(z0) print(f"[MTI] Initial z0 ({len(z0)} booleans): {z0}") if len(self.problem.get_mti_boolean_parameter_indices) > 0: try: self.problem.build_mti_incidence_and_order(x0, dx0, self.h) except Exception as ex: raise RuntimeError(f"Failed to build MTI incidence/solving order: {ex}") from ex self.z = np.full((self.steps + 1, len(z0)), np.nan, dtype=float) if len(z0) > 0: self.z[0, :] = z0 dx_last = dx0.copy() z_prev = z0.copy() last_completed_idx = 0 for step_idx in range(self.steps): self.problem.report_progress2(step_idx, self.steps) t_prev = self.t[step_idx] t_macro_target = t_prev + self.h x_prev = self.y[step_idx, :].copy() x_local = x_prev.copy() t_local_prev = t_prev converged = False is_first_local_step = True force_zero_dx = bool(step_idx == 0) while t_local_prev < (t_macro_target - 1e-15): forced_event_time: float | None = self.problem.get_next_forced_event_time( t_local_prev, t_macro_target, ) if forced_event_time is None: t_curr = t_macro_target else: t_curr = forced_event_time h_eff = t_curr - t_local_prev if h_eff <= 0.0: break try: self.problem.update_variable_params(t=t_local_prev, x_snapshot=x_local) except TypeError: self.problem.update_variable_params(t_local_prev) try: self.problem.update(t_curr, x_local, self.problem._variable_parameters_values) except Exception: pass bool_indices = self.problem.get_mti_boolean_parameter_indices n_bool = len(bool_indices) accepted_x: Vec | None = None accepted_dx: Vec | None = None accepted_z: Vec | None = None # MATLAB-aligned intent: keep current z unless event handling is needed. self.problem.set_mti_boolean_state(z_prev) ok_fix, x_fix, dx_fix, res_fix = self._solve_continuous_with_fallback( x_prev=x_local, dx_last=dx_last, h_eff=h_eff, force_zero_dx=force_zero_dx and is_first_local_step, ) best_fail_res = np.inf best_fail_x = x_local.copy() best_fail_dx = dx_last.copy() if np.isfinite(res_fix): best_fail_res = float(res_fix) best_fail_x = x_fix best_fail_dx = dx_fix event_needed = False event_idx = -1 if ok_fix: g_fix = self.problem.compute_mti_inequalities(x_fix, dx_fix, x_local, h_eff) h_safe = max(float(h_eff), 1e-12) xpp_fix = (np.asarray(dx_fix, dtype=float) - np.asarray(dx_last, dtype=float)) / h_safe dg_fix = self.problem.total_derivative_inequalities(x_fix, dx_fix, xpp=xpp_fix) event_needed, event_idx = self._detect_inequality_event(g_fix, dg_fix) if not event_needed: accepted_x = x_fix accepted_dx = dx_fix accepted_z = z_prev.copy() converged = True elif n_bool > 0: # If fixed-z Newton cannot solve the continuous equations, # the current boolean assignment may be inconsistent. In # that case restore the previous MTI behavior and let the # event handler enumerate local candidates immediately. event_needed = True if event_needed: if ok_fix: ok_loc, h_event, x_event, dx_event, event_idx = self._locate_inequality_event( x_prev=x_local, dx_last=dx_last, h_eff=h_eff, z_prev=z_prev, event_idx=event_idx, x_end=x_fix, dx_end=dx_fix, force_zero_dx=force_zero_dx and is_first_local_step, ) if self.debug: print( f"[MTI-EVENT-SPLIT] mode=localized ineq={event_idx} " f"h_eff={float(h_eff):.6e} h_event={float(h_event):.6e} " f"remaining={max(0.0, float(h_eff) - float(h_event)):.6e}" ) else: ok_loc = True remaining_h = float(h_eff) x_event = x_local dx_event = dx_last if self.debug: print( f"[MTI-EVENT-SPLIT] mode=unlocalized-recovery " f"h_eff={float(h_eff):.6e} reason=fixed_z_solve_failed" ) if ok_loc: if ok_fix: remaining_h = max(0.0, float(h_eff) - float(h_event)) # Match MATLAB's split/restart flow: eventTotalDerivative # resolves the mode and derivatives at the event instant; # the post-event continuous segment is solved separately. event_solve_h = max(min(float(h_eff), 1e-9), 1e-12) else: # No fixed-z trajectory exists to localize against; # keep the prior candidate solve semantics for this # recovery path rather than inventing an event time. event_solve_h = float(h_eff) if self.debug: print( f"[MTI-EVENT-SOLVE] ineq={event_idx if ok_fix else 'auto'} " f"event_solve_h={float(event_solve_h):.6e} " f"post_remaining={float(remaining_h):.6e}" ) ok_evt, x_evt, dx_evt, z_evt, best_fail_res_evt, best_fail_x_evt, best_fail_dx_evt = self._event_total_derivative( x_prev=x_event, dx_last=dx_event, h_eff=event_solve_h, z_prev=z_prev, force_zero_dx=False, forced_ineq_idx=event_idx if ok_fix else None, ) else: ok_evt = False x_evt = x_fix dx_evt = dx_fix z_evt = z_prev.copy() best_fail_res_evt = best_fail_res best_fail_x_evt = x_fix best_fail_dx_evt = dx_fix if ok_evt and ok_loc: if (not ok_fix) or remaining_h <= 1e-15: accepted_x = x_evt accepted_dx = dx_evt converged = True else: self.problem.set_mti_boolean_state(z_evt) ok_post, x_post, dx_post, res_post = self._solve_continuous_with_fallback( x_prev=x_evt, dx_last=dx_evt, h_eff=remaining_h, force_zero_dx=False, ) if self.debug: print( f"[MTI-POST-EVENT] ok={int(ok_post)} " f"remaining={float(remaining_h):.6e} res={float(res_post):.6e}" ) if ok_post: g_post = self.problem.compute_mti_inequalities(x_post, dx_post, x_event, remaining_h) h_safe_post = max(float(remaining_h), 1e-12) xpp_post = (np.asarray(dx_post, dtype=float) - np.asarray(dx_evt, dtype=float)) / h_safe_post dg_post = self.problem.total_derivative_inequalities(x_post, dx_post, xpp=xpp_post) post_event_needed, _ = self._detect_inequality_event(g_post, dg_post) if not post_event_needed: accepted_x = x_post accepted_dx = dx_post converged = True elif np.isfinite(res_post) and res_post < best_fail_res: best_fail_res = float(res_post) best_fail_x = x_post best_fail_dx = dx_post elif np.isfinite(res_post) and res_post < best_fail_res: best_fail_res = float(res_post) best_fail_x = x_post best_fail_dx = dx_post if converged: accepted_z = z_evt if np.isfinite(best_fail_res_evt) and best_fail_res_evt < best_fail_res: best_fail_res = best_fail_res_evt best_fail_x = best_fail_x_evt best_fail_dx = best_fail_dx_evt if not converged or accepted_x is None or accepted_dx is None: if self.debug: print(f"[MTI-FALLBACK] mode=plain_dae h_eff={float(h_eff):.6e}") ok_dae, x_dae, dx_dae, res_dae = self._solve_plain_dae( x_prev=x_local, dx_last=dx_last, h_eff=h_eff, force_zero_dx=force_zero_dx and is_first_local_step, ) if ok_dae: accepted_x = x_dae accepted_dx = dx_dae accepted_z = z_prev.copy() converged = True if np.isfinite(res_dae) and res_dae < best_fail_res: best_fail_res = res_dae best_fail_x = x_dae best_fail_dx = dx_dae if not converged or accepted_x is None or accepted_dx is None: if self.compare: f_mti = self.problem.compute_mti_equalities(best_fail_x, best_fail_dx, x_local, h_eff) f_dae = self._rhs_implicit(best_fail_x, best_fail_dx, x_local, h_eff) n_mti = float(np.linalg.norm(f_mti, np.inf)) if f_mti.size > 0 else 0.0 n_dae = float(np.linalg.norm(f_dae, np.inf)) if f_dae.size > 0 else 0.0 diff = float(np.linalg.norm(np.asarray(f_mti) - np.asarray(f_dae), np.inf)) if f_mti.size == f_dae.size else np.inf j = self._jacobian_implicit(best_fail_x, best_fail_dx, h_eff) j_dense = j.toarray() if sp.issparse(j) else np.asarray(j) rank = int(np.linalg.matrix_rank(j_dense)) cond = float(np.linalg.cond(j_dense)) if j_dense.size > 0 else np.nan print( f"[MTI-COMPARE] step={step_idx} t={t_curr:.6f} " f"||F_mti||inf={n_mti:.3e} ||F_dae||inf={n_dae:.3e} " f"||F_mti-F_dae||inf={diff:.3e} rank(J)={rank}/{j_dense.shape[0]} cond(J)={cond:.3e}" ) break x_local = accepted_x dx_last = accepted_dx t_local_prev = t_curr is_first_local_step = False if accepted_z is not None and len(accepted_z) > 0: z_prev = np.asarray(accepted_z, dtype=float) else: z_prev = np.zeros(0, dtype=float) if t_local_prev < (t_macro_target - 1e-15): break self.y[step_idx + 1, :] = x_local self.t[step_idx + 1] = t_macro_target last_completed_idx = step_idx + 1 if len(z_prev) > 0: self.z[step_idx + 1, :] = z_prev else: z_prev = np.zeros(0, dtype=float) return self.t[: last_completed_idx + 1], self.y[: last_completed_idx + 1, :], well_initialized, converged