Source code for VeraGridEngine.Simulations.Rms.numerical.pseudo_transient

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

import numpy as np
import scipy.sparse as sp
import time
import warnings
import sys
import os
from scipy.sparse import csc_matrix
from scipy.sparse import linalg as spla
import matplotlib.pyplot as plt

from VeraGridEngine.Simulations.Rms.problems.rms_problem_dae import RmsProblemDae
from VeraGridEngine.Utils.Sparse.csc import pack_4_by_4_scipy
from VeraGridEngine.Utils.Symbolic.symbolic import get_expression_vars, Var, Const, BinOp
from VeraGridEngine.basic_structures import Vec, Mat
from VeraGridEngine.Simulations.Rms.problems.rms_problem_template import RmsProblemTemplate


[docs] class PseudoTransient: def __init__(self, problem: RmsProblemDae, h: float, dtau0: float, dtau_max: float = 1e2, dtau_min: float = 1e-5, tol: float = 1e-6, reference_error_tol: float = 3.0, max_iter: int = 1000, verbose: bool = True, fixed_var_uids: list[int] | None = None, debug_check_x_new: bool = False, debug_x_new_abs_max: float = 1e6): """ :param problem: """ self.problem = problem self.h = h self.dtau0 = dtau0 self.dtau_min = dtau_min self.dtau_max = dtau_max self.steps = 1000 self.max_iter_0 = max_iter self.tol = tol self.reference_error_tol = float(reference_error_tol) self.verbose = verbose self.use_weighted_residual = os.getenv("VERAGRID_PSEUDO_WEIGHTED_RESIDUAL", "0").lower() in {"1", "true", "yes", "on"} self.weight_network = float(os.getenv("VERAGRID_PSEUDO_WEIGHT_NETWORK", "3.0")) self.weight_power = float(os.getenv("VERAGRID_PSEUDO_WEIGHT_POWER", "2.0")) self.allow_lsqr_fallback = os.getenv("PSEUDO_ALLOW_LSQR", "1").lower() in {"1", "true", "yes", "on"} self.fixed_var_uids = set(fixed_var_uids or []) uid2idx = getattr(self.problem, "uid2idx_vars", {}) self._fixed_var_indices = sorted([uid2idx[uid] for uid in self.fixed_var_uids if uid in uid2idx]) self.debug_check_x_new = debug_check_x_new self.debug_x_new_abs_max = float(debug_x_new_abs_max) self.t: Vec = np.empty(self.steps + 1) self.y: Mat = np.empty((self.steps + 1, self.problem.get_all_vars_number())) self._singular_report_count = 0 def _apply_fixed_mask(self, x: Vec, x_ref: Vec) -> Vec: if len(self._fixed_var_indices) == 0: return x for idx in self._fixed_var_indices: if 0 <= idx < x.size and 0 <= idx < x_ref.size: x[idx] = x_ref[idx] return x def _check_fixed_drift(self, x: Vec, x_ref: Vec, where: str) -> None: if len(self._fixed_var_indices) == 0: return bad = [] for idx in self._fixed_var_indices: if 0 <= idx < x.size and 0 <= idx < x_ref.size: d = float(abs(x[idx] - x_ref[idx])) if d > 1e-14: bad.append((idx, d, float(x[idx]), float(x_ref[idx]))) if bad: preview = [f"{self._var_index_name(i)}: x={vx:+.6e}, ref={vr:+.6e}, drift={d:.3e}" for i, d, vx, vr in bad[:6]] raise ValueError(f"Fixed variable drift detected at {where}: {preview}") def _dbg(self, msg: str) -> None: if self.verbose: print(f"[PseudoTransient] {msg}") def _build_residual_weights(self, n_rhs: int) -> np.ndarray: w = np.ones(n_rhs, dtype=float) if not self.use_weighted_residual: return w n_states = int(self.problem.get_states_number()) # Prioritize key network and power-balance equations in algebraic block. # rhs index = n_states + algebraic_index for aidx in (0, 1, 4, 5): ridx = n_states + aidx if 0 <= ridx < n_rhs: w[ridx] = self.weight_network for aidx in (2, 3): ridx = n_states + aidx if 0 <= ridx < n_rhs: w[ridx] = max(w[ridx], self.weight_power) return w @staticmethod def _weighted_norm(v: np.ndarray, w: np.ndarray) -> float: return float(np.linalg.norm(w * v)) def _get_problem_state_eqs(self): if hasattr(self.problem, "_state_eqs"): return list(self.problem._state_eqs) if hasattr(self.problem, "block") and hasattr(self.problem.block, "state_eqs"): return list(self.problem.block.state_eqs) return list() def _get_problem_algebraic_eqs(self): if hasattr(self.problem, "_algebraic_eqs"): return list(self.problem._algebraic_eqs) if hasattr(self.problem, "block") and hasattr(self.problem.block, "algebraic_eqs"): return list(self.problem.block.algebraic_eqs) return list() def _get_problem_state_vars(self): if hasattr(self.problem, "_state_vars"): return list(self.problem._state_vars) if hasattr(self.problem, "block") and hasattr(self.problem.block, "state_vars"): return list(self.problem.block.state_vars) return list() def _rhs_index_equation_repr(self, rhs_idx: int) -> str: n_states = int(self.problem.get_states_number()) state_eqs = self._get_problem_state_eqs() algebraic_eqs = self._get_problem_algebraic_eqs() state_vars = self._get_problem_state_vars() if rhs_idx < n_states: base_eq = state_eqs[rhs_idx] if rhs_idx < len(state_eqs) else "<unknown state eq>" state_name = state_vars[rhs_idx].name if rhs_idx < len(state_vars) else f"x[{rhs_idx}]" return f"state_update({state_name}): x - x_prev - h*({base_eq})" alg_idx = rhs_idx - n_states if 0 <= alg_idx < len(algebraic_eqs): return f"algebraic[{alg_idx}]: {algebraic_eqs[alg_idx]}" return f"rhs[{rhs_idx}]: <unknown equation>" def _expr_begin_repr(self, expr) -> str: begin_fn = getattr(expr, "begin", None) if callable(begin_fn): try: return str(begin_fn()) except Exception: pass if isinstance(expr, BinOp) and expr.op == "-": return str(expr.left) if isinstance(expr, Var): return expr.name return str(expr) def _rhs_index_equation_begin(self, rhs_idx: int) -> str: n_states = int(self.problem.get_states_number()) state_eqs = self._get_problem_state_eqs() algebraic_eqs = self._get_problem_algebraic_eqs() state_vars = self._get_problem_state_vars() if rhs_idx < n_states: state_name = state_vars[rhs_idx].name if rhs_idx < len(state_vars) else f"x[{rhs_idx}]" if rhs_idx < len(state_eqs): return f"state_update({state_name})" return f"state_update[{rhs_idx}]" alg_idx = rhs_idx - n_states if 0 <= alg_idx < len(algebraic_eqs): head = self._expr_begin_repr(algebraic_eqs[alg_idx]) return f"algebraic[{alg_idx}]: {head}" return f"rhs[{rhs_idx}]" def _var_value_debug(self, var: Var, x: Vec) -> str: local_vars = self._get_problem_state_vars() + self._problem_algebraic_vars() local_uid2idx = {v.uid: i for i, v in enumerate(local_vars)} if var.uid in local_uid2idx: i = local_uid2idx[var.uid] if 0 <= i < len(x): return f"{var.name}={x[i]}(x_local[{i}])" uid2idx_vars = getattr(self.problem, "uid2idx_vars", None) if isinstance(uid2idx_vars, dict) and var.uid in uid2idx_vars: i = uid2idx_vars[var.uid] if 0 <= i < len(x): return f"{var.name}={x[i]}(x_global[{i}])" uid2idx_evt = getattr(self.problem, "_uid2idx_event_params", None) vparams = getattr(self.problem, "_variable_parameters_values", None) if isinstance(uid2idx_evt, dict) and var.uid in uid2idx_evt and vparams is not None: i = uid2idx_evt[var.uid] if 0 <= i < len(vparams): return f"{var.name}={vparams[i]}(vprms[{i}])" uid2idx_params = getattr(self.problem, "_uid2idx_params", None) cparams = getattr(self.problem, "_constant_params", None) if isinstance(uid2idx_params, dict) and var.uid in uid2idx_params and cparams is not None: i = uid2idx_params[var.uid] if 0 <= i < len(cparams): return f"{var.name}={cparams[i]}(cprms[{i}])" return f"{var.name}=<unresolved>" def _rhs_index_equation_vars_debug(self, rhs_idx: int, x: Vec) -> list[str]: n_states = int(self.problem.get_states_number()) eq = None if rhs_idx < n_states: state_eqs = self._get_problem_state_eqs() if rhs_idx < len(state_eqs): eq = state_eqs[rhs_idx] else: alg_idx = rhs_idx - n_states algebraic_eqs = self._get_problem_algebraic_eqs() if 0 <= alg_idx < len(algebraic_eqs): eq = algebraic_eqs[alg_idx] if eq is None or isinstance(eq, Const): return list() try: vars_in_eq = get_expression_vars(eq) except Exception: return list() debug_items: list[str] = list() seen = set() for vr in vars_in_eq: if not isinstance(vr, Var): continue if vr.uid in seen: continue seen.add(vr.uid) debug_items.append(self._var_value_debug(vr, x)) return debug_items def _debug_check_x_new(self, x_new: Vec, step_idx: int, tries: int, dtau: float) -> None: if not self.debug_check_x_new: return bad_idx = np.where(~np.isfinite(x_new))[0] if bad_idx.size > 0: bad_vals = x_new[bad_idx] raise ValueError( "NaN or Inf in x_new" f" (step={step_idx}, try={tries}, dtau={dtau:.3e}, " f"bad_idx={bad_idx.tolist()}, bad_vals={bad_vals.tolist()})" ) large_idx = np.where(np.abs(x_new) > self.debug_x_new_abs_max)[0] if large_idx.size > 0: large_vals = x_new[large_idx] raise ValueError( "x_new magnitude too large" f" (step={step_idx}, try={tries}, dtau={dtau:.3e}, abs_max={self.debug_x_new_abs_max:.3e}, " f"large_idx={large_idx.tolist()}, large_vals={large_vals.tolist()})" ) def _problem_algebraic_vars(self): algeb = getattr(self.problem, "algebraic_vars", None) if callable(algeb): return list(algeb()) if algeb is not None: return list(algeb) getter = getattr(self.problem, "get_algebraic_vars", None) if callable(getter): return list(getter()) return list() def _problem_state_vars(self): states = getattr(self.problem, "state_vars", None) if callable(states): return list(states()) if states is not None: return list(states) return list() def _problem_diff_vars(self): diff_vars = getattr(self.problem, "_diff_vars", None) if diff_vars is not None: return list(diff_vars) getter = getattr(self.problem, "get_diff_vars", None) if callable(getter): return list(getter()) return list() def _find_reference_pin_indices(self) -> list[int]: n_vars = int(self.problem.get_all_vars_number()) groups = ( ("Pm_ref", "Pref", "P_ref"), ("UsRefPu", "Vref", "V_ref", "U_ref"), ("u_exciter3", "y_exciter3"), ) found: list[int] = list() used = set(self._fixed_var_indices) for group in groups: idx = None for i in range(n_vars): if i in used: continue nm = self._var_index_name(i) if any(tok in nm for tok in group): idx = i break if idx is not None: found.append(idx) used.add(idx) return found def _predictor_stable_algebraic_indices(self) -> list[int]: """ Keep algebraic equations that do not explicitly depend on diff vars. Mirrors old `_algebraic_eqs_stable` idea. """ algebraic_eqs = self._get_problem_algebraic_eqs() diff_vars = self._problem_diff_vars() diff_uids = {v.uid for v in diff_vars if isinstance(v, Var)} if len(algebraic_eqs) == 0: return list() if len(diff_uids) == 0: return list(range(len(algebraic_eqs))) stable = list() for i, eq in enumerate(algebraic_eqs): vars_in_eq = get_expression_vars(eq) uses_diff = any(v.base_var is not None for v in vars_in_eq) if not uses_diff: stable.append(i) return stable def _diff_free_equation_indices(self) -> list[int]: n_states = int(self.problem.get_states_number()) algebraic_eqs = self._get_problem_algebraic_eqs() diff_vars = self._problem_diff_vars() diff_uids = {v.uid for v in diff_vars if isinstance(v, Var)} selected = list() for j, eq in enumerate(algebraic_eqs): vars_in_eq = get_expression_vars(eq) uses_diff = any(v.base_var is not None for v in vars_in_eq) if not uses_diff: selected.append(n_states + j) if self.verbose: self._dbg( f"diff-free equations: selected={len(selected)} / algebraic={len(algebraic_eqs)} " f"(n_states_offset={n_states})" ) for rhs_idx in selected: self._dbg(f" diff-free rhs[{rhs_idx}]: {self._rhs_index_equation_repr(int(rhs_idx))}") return selected def _find_initial_diff_free_feasible_point(self, x0: Vec, max_iter: int = 10) -> Vec: x = np.array(x0, dtype=float, copy=True) x_ref = np.array(x0, dtype=float, copy=True) dx0 = np.zeros(self.problem.get_diff_var_number(), dtype=float) eq_idx = self._diff_free_equation_indices() if len(eq_idx) == 0: self._dbg("diff-free init: no equations selected") return self._apply_fixed_mask(x, x_ref) n_vars = int(self.problem.get_all_vars_number()) fixed_cols = set(self._fixed_var_indices) # Temporary initialization pins to break near-nullspace directions. # This is local to this method (unpins automatically when we return). # Prefer reference-like variables (Pref, Vref), then fallback options. pin_priority_groups = ( ("Pm_ref", "Pref", "P_ref"), ("UsRefPu", "Vref", "V_ref", "U_ref"), ("u_gov1", "dt_1_y_gov1"), ) pinned_indices = list() for group in pin_priority_groups: pin_idx = None for i in range(n_vars): if i in fixed_cols: continue nm = self._var_index_name(i) if any(tok in nm for tok in group): pin_idx = i break if pin_idx is not None: fixed_cols.add(pin_idx) pinned_indices.append(pin_idx) if len(pinned_indices) > 0: labels = [f"{i}:{self._var_index_name(i)}" for i in pinned_indices] self._dbg(f"diff-free init: temporary pins enabled: {labels}") else: self._dbg("diff-free init: temporary pins not found (Pref/Vref groups)") free_cols = [i for i in range(n_vars) if i not in fixed_cols] if len(free_cols) == 0: self._dbg("diff-free init: no free variables") return self._apply_fixed_mask(x, x_ref) tol_proj = max(self.tol * 10.0, 1e-8) converged = False rhs0 = self._rhs_steady(x, dx0) if rhs0.size > 0 and np.all(np.isfinite(rhs0)): r0 = rhs0[eq_idx] r0_inf = float(np.linalg.norm(r0, np.inf)) if r0.size > 0 else 0.0 self._dbg( f"diff-free init start: n_eq={len(eq_idx)}, n_free={len(free_cols)}, residual_inf={r0_inf:.3e}" ) for k in range(max_iter): rhs_full = self._rhs_steady(x, dx0) if rhs_full.size == 0 or not np.all(np.isfinite(rhs_full)): self._dbg(f"diff-free init abort: non-finite rhs at iter={k + 1}") break r = rhs_full[eq_idx] r_inf = float(np.linalg.norm(r, np.inf)) if r.size > 0 else 0.0 if r_inf <= tol_proj: self._dbg(f"diff-free init converged at iter={k + 1}, residual_inf={r_inf:.3e}") converged = True break J_full = self._jacobian_steady(x, dx0).tocsc() J_sel = J_full[eq_idx, :][:, free_cols] delta_free, *_ = spla.lsqr( J_sel, -r, atol=1e-10, btol=1e-10, iter_lim=max(200, 2 * len(free_cols)) ) delta_free = np.asarray(delta_free, dtype=float) if delta_free.size != len(free_cols) or not np.all(np.isfinite(delta_free)): self._report_singularity_diagnostics(J_sel.tocsc(), r, x=x, context=f"diff-free init iter={k + 1}") self._dbg(f"diff-free init abort: invalid correction at iter={k + 1}") break best_x = None best_r = np.inf accepted_alpha = 1.0 tie_tol = max(1e-12, 1e-10 * max(1.0, r_inf)) for alpha in (1.0, 0.5, 0.25, 0.1): xt = np.array(x, copy=True) xt[np.array(free_cols, dtype=int)] += alpha * delta_free xt = self._apply_fixed_mask(xt, x_ref) rt_full = self._rhs_steady(xt, dx0) if not np.all(np.isfinite(rt_full)): continue rt = rt_full[eq_idx] rt_inf = float(np.linalg.norm(rt, np.inf)) if rt.size > 0 else 0.0 if best_x is None: best_x = xt best_r = rt_inf accepted_alpha = alpha if rt_inf <= r_inf * (1.0 - 1e-4 * alpha): break continue if rt_inf < best_r: best_r = rt_inf best_x = xt accepted_alpha = alpha elif abs(rt_inf - best_r) <= tie_tol and alpha > accepted_alpha: # Allow movement along flat/null directions when residual is unchanged. best_x = xt accepted_alpha = alpha if rt_inf <= r_inf * (1.0 - 1e-4 * alpha): break if best_x is None: # No finite trial point; fall back to full step update. best_x = np.array(x, copy=True) best_x[np.array(free_cols, dtype=int)] += delta_free best_x = self._apply_fixed_mask(best_x, x_ref) best_r = r_inf accepted_alpha = 1.0 x = best_x self._dbg( f"diff-free init iter={k + 1}: residual_inf={r_inf:.3e}->{best_r:.3e}, " f"|delta|_2={np.linalg.norm(delta_free):.3e}, alpha={accepted_alpha:.2f}" ) rhsf = self._rhs_steady(x, dx0) if rhsf.size > 0 and np.all(np.isfinite(rhsf)): rf = rhsf[eq_idx] rf_inf = float(np.linalg.norm(rf, np.inf)) if rf.size > 0 else 0.0 self._dbg(f"diff-free init finish: residual_inf={rf_inf:.3e}") if not converged and rf_inf > tol_proj: abs_rf = np.abs(rf) ranked_local = np.argsort(-abs_rf) n_show = min(20, ranked_local.size) self._dbg( f"diff-free init failed: top {n_show} residual equations " f"(threshold={tol_proj:.3e})" ) for kk in ranked_local[:n_show]: rhs_idx = int(eq_idx[int(kk)]) val = float(rf[int(kk)]) self._dbg(f" rhs[{rhs_idx}]={val:+.6e} | {self._rhs_index_equation_repr(rhs_idx)}") return self._apply_fixed_mask(x, x_ref) def _predictor_project_initial_guess(self, x0: Vec) -> Vec: n_states = int(self.problem.get_states_number()) n_alg = int(self.problem.get_algebraic_var_number()) if n_alg == 0: return x0 x_proj = np.array(x0, dtype=float, copy=True) dx_proj = np.zeros(self.problem.get_diff_var_number(), dtype=float) proj_tol = max(self.tol * 10.0, 1e-8) stable_idx = self._predictor_stable_algebraic_indices() if len(stable_idx) == 0: stable_idx = list(range(n_alg)) self._dbg(f"predictor setup: n_alg={n_alg}, n_stable={len(stable_idx)}") for k in range(8): r_alg_full = self.problem.rhs_algebraic(x_proj, dx_proj) if r_alg_full.size == 0: break if not np.all(np.isfinite(r_alg_full)): self._dbg(f"predictor abort: non-finite algebraic rhs at iter={k + 1}") break r_alg = r_alg_full[stable_idx] rinf = float(np.linalg.norm(r_alg, np.inf)) if r_alg.size > 0 else 0.0 if rinf <= proj_tol: self._dbg(f"predictor converged at iter={k + 1}, residual_inf={rinf:.3e}") break J22_full = self.problem.get_j22(x_proj, dx_proj, max(abs(self.dtau0), 1e-6)).tocsc() Jp = J22_full[stable_idx, :] # Predictor Jacobian is generally rectangular on reduced stable set. delta_alg, *_ = spla.lsqr(Jp, -r_alg, atol=1e-10, btol=1e-10, iter_lim=max(200, 2 * n_alg)) delta_alg = np.asarray(delta_alg, dtype=float) if delta_alg.size != n_alg or not np.all(np.isfinite(delta_alg)): self._report_singularity_diagnostics(Jp.tocsc(), r_alg, x=x_proj, context=f"predictor iter={k + 1}") self._dbg( f"predictor abort: invalid correction at iter={k + 1}, " f"delta_size={delta_alg.size}, n_alg={n_alg}" ) break best_x = x_proj best_rinf = rinf for alpha in (1.0, 0.5, 0.25, 0.1): xt = np.array(x_proj, copy=True) xt[n_states:n_states + n_alg] += alpha * delta_alg rt_full = self.problem.rhs_algebraic(xt, dx_proj) if not np.all(np.isfinite(rt_full)): continue rt = rt_full[stable_idx] rt_inf = float(np.linalg.norm(rt, np.inf)) if rt.size > 0 else 0.0 if rt_inf < best_rinf: best_rinf = rt_inf best_x = xt if rt_inf <= rinf * (1.0 - 1e-4 * alpha): break x_proj = best_x self._dbg( f"predictor iter={k + 1}: residual_inf={rinf:.3e}->{best_rinf:.3e}, " f"|delta_alg|_2={np.linalg.norm(delta_alg):.3e}" ) return self._apply_fixed_mask(x_proj, x0) def _project_feasible_x(self, x: Vec, dx: Vec, x_ref: Vec | None = None, max_iter: int = 4) -> Vec: n_states = int(self.problem.get_states_number()) n_alg = int(self.problem.get_algebraic_var_number()) if n_alg == 0: return np.array(x, dtype=float, copy=True) x_proj = np.array(x, dtype=float, copy=True) if x_ref is None: x_ref = x_proj stable_idx = self._predictor_stable_algebraic_indices() if len(stable_idx) == 0: stable_idx = list(range(n_alg)) proj_tol = max(self.tol * 5.0, 1e-8) dt_jac = max(abs(self.dtau0), 1e-6) # Feasibility projection is based on algebraic/stability equations, # so derivative terms are neutralized. dx_feasible = np.zeros_like(dx) for _ in range(max_iter): rhs_alg_full = self.problem.rhs_algebraic(x_proj, dx_feasible) if rhs_alg_full.size == 0 or not np.all(np.isfinite(rhs_alg_full)): break rhs_alg = rhs_alg_full[stable_idx] residual_inf = float(np.linalg.norm(rhs_alg, np.inf)) if rhs_alg.size > 0 else 0.0 if residual_inf <= proj_tol: break J22_full = self.problem.get_j22(x_proj, dx_feasible, dt_jac).tocsc() J_proj = J22_full[stable_idx, :] delta_alg, *_ = spla.lsqr(J_proj, -rhs_alg, atol=1e-10, btol=1e-10, iter_lim=max(200, 2 * n_alg)) delta_alg = np.asarray(delta_alg, dtype=float) if delta_alg.size != n_alg or not np.all(np.isfinite(delta_alg)): break best_x = x_proj best_residual = residual_inf for alpha in (1.0, 0.5, 0.25, 0.1): xt = np.array(x_proj, copy=True) xt[n_states:n_states + n_alg] += alpha * delta_alg xt = self._apply_fixed_mask(xt, x_ref) rt_full = self.problem.rhs_algebraic(xt, dx_feasible) if not np.all(np.isfinite(rt_full)): continue rt = rt_full[stable_idx] rt_inf = float(np.linalg.norm(rt, np.inf)) if rt.size > 0 else 0.0 if rt_inf < best_residual: best_residual = rt_inf best_x = xt if rt_inf <= residual_inf * (1.0 - 1e-4 * alpha): break x_proj = best_x return self._apply_fixed_mask(x_proj, x_ref) def _var_index_name(self, idx: int) -> str: vars_all = self._problem_state_vars() + self._problem_algebraic_vars() if 0 <= idx < len(vars_all): return vars_all[idx].name return f"x[{idx}]" def _report_selected_var_values(self, x: Vec, labels: tuple[str, ...]) -> None: vars_all = self._problem_state_vars() + self._problem_algebraic_vars() if len(vars_all) == 0 or x.size == 0: return matches: list[str] = [] used_idx: set[int] = set() for token in labels: token_l = token.lower() hit_idx = None # Prefer exact/canonical prefix match (e.g. Id*), avoid accidental matches like Psid*. for i, var in enumerate(vars_all): name = (getattr(var, "name", "") or "").strip() if i in used_idx: continue if name.lower().startswith(token_l): hit_idx = i break # Fallback: contains token when prefix is unavailable. if hit_idx is None: for i, var in enumerate(vars_all): name = (getattr(var, "name", "") or "").strip() if i in used_idx: continue if token_l in name.lower(): hit_idx = i break if hit_idx is not None: used_idx.add(hit_idx) name = getattr(vars_all[hit_idx], "name", "") or f"x[{hit_idx}]" val = float(x[hit_idx]) if 0 <= hit_idx < x.size else float("nan") matches.append(f"{name}={val:+.6e}") else: matches.append(f"{token}=<not found>") if matches: print( f"[PseudoTransient][Singular] selected vars: {matches}", file=sys.stderr, ) def _report_piecewise_activity(self, rhs: Vec) -> None: if rhs.size == 0 or not np.all(np.isfinite(rhs)): return n_states = int(self.problem.get_states_number()) n_rhs = int(rhs.size) piecewise_rows: list[tuple[int, float, str]] = [] for i in range(n_states, n_rhs): eq_txt = self._rhs_index_equation_repr(i) if "heaviside" in eq_txt: piecewise_rows.append((i, float(rhs[i]), eq_txt)) if len(piecewise_rows) == 0: return piecewise_rows.sort(key=lambda t: abs(t[1]), reverse=True) top = piecewise_rows[:12] payload = [f"rhs[{i}]={v:+.3e} | {eq}" for i, v, eq in top] print( f"[PseudoTransient][Singular] piecewise/heaviside residuals (top {len(top)}): {payload}", file=sys.stderr, ) def _report_key_equation_tracking(self, rhs: Vec, x_new: Vec, x_prev: Vec, tag: str) -> None: if rhs.size == 0 or x_new.size == 0 or x_prev.size == 0: return n_states = int(self.problem.get_states_number()) key_alg = (0, 1, 2, 3, 4, 5, 14) entries: list[str] = [] for aidx in key_alg: ridx = n_states + aidx if 0 <= ridx < rhs.size: entries.append(f"rhs[{ridx}]={rhs[ridx]:+.3e} (alg[{aidx}])") if entries: print(f"[PseudoTransient][Track] {tag} key equations: {entries}", file=sys.stderr) watch_tokens = ( "deltaGen", "omegaGen", "VdGen", "VqGen", "IdGen", "IqGen", "Psid_prime", "Psiq_prime", "Eq_prime", "Ed_prime", "SaGen", "Psi_ag", ) vars_all = self._problem_state_vars() + self._problem_algebraic_vars() deltas: list[str] = [] used = set() for tok in watch_tokens: tok_l = tok.lower() hit = None for i, var in enumerate(vars_all): nm = (getattr(var, "name", "") or "").lower() if i in used: continue if tok_l in nm: hit = i break if hit is None or hit >= x_new.size or hit >= x_prev.size: continue used.add(hit) dv = float(x_new[hit] - x_prev[hit]) if abs(dv) > 0.0: name = getattr(vars_all[hit], "name", f"x[{hit}]") deltas.append(f"{name}: dx={dv:+.3e}") if deltas: print(f"[PseudoTransient][Track] {tag} variable deltas: {deltas}", file=sys.stderr) def _report_singularity_diagnostics(self, J: sp.csc_matrix, rhs: Vec, x: Vec, context: str) -> None: # Print diagnostics at every failing step/try during debugging. self._singular_report_count += 1 try: row_abs = np.asarray(np.abs(J).sum(axis=1)).ravel() col_abs = np.asarray(np.abs(J).sum(axis=0)).ravel() near_zero_rows = np.where(row_abs < 1e-14)[0] near_zero_cols = np.where(col_abs < 1e-14)[0] print(f"[PseudoTransient][Singular] context={context}", file=sys.stderr) print( f"[PseudoTransient][Singular] shape={J.shape}, nnz={J.nnz}, " f"near_zero_rows={int(near_zero_rows.size)}, near_zero_cols={int(near_zero_cols.size)}", file=sys.stderr, ) if near_zero_rows.size > 0: preview = near_zero_rows[:8] row_labels = [self._rhs_index_equation_begin(int(i)) for i in preview] print( f"[PseudoTransient][Singular] zero-like rows (first {len(preview)}): {row_labels}", file=sys.stderr, ) if near_zero_cols.size > 0: preview = near_zero_cols[:8] col_labels = [self._var_index_name(int(i)) for i in preview] print( f"[PseudoTransient][Singular] zero-like cols (first {len(preview)}): {col_labels}", file=sys.stderr, ) if rhs.size > 0 and np.all(np.isfinite(rhs)): idx = np.argsort(np.abs(rhs))[::-1][:8] top_rhs = [f"{self._rhs_index_equation_repr(int(i))}: {rhs[int(i)]:+.3e}" for i in idx] print(f"[PseudoTransient][Singular] largest rhs entries: {top_rhs}", file=sys.stderr) self._report_piecewise_activity(rhs) self._report_selected_var_values(x, labels=("Vd", "Vq", "Id", "Iq")) # Show equations most directly coupled to Vf-like variables. vf_cols = [i for i in range(J.shape[1]) if "vf" in self._var_index_name(i).lower()] if len(vf_cols) > 0: for c in vf_cols[:3]: col = J.getcol(c) if col.nnz == 0: print( f"[PseudoTransient][Singular] Vf-coupled column {self._var_index_name(c)} has no nonzeros", file=sys.stderr, ) continue rows = col.indices vals = col.data order = np.argsort(np.abs(vals))[::-1] top = order[:8] eqs = [ f"{self._rhs_index_equation_repr(int(rows[k]))}: d/d{self._var_index_name(c)}={vals[k]:+.3e}" for k in top ] print( f"[PseudoTransient][Singular] equations coupled to {self._var_index_name(c)}: {eqs}", file=sys.stderr, ) # SVD-based diagnostics for moderate size systems. n = J.shape[0] if n <= 220: dense = J.toarray() _, s, vh = np.linalg.svd(dense, full_matrices=False) smin = float(s[-1]) if s.size > 0 else float("nan") smax = float(s[0]) if s.size > 0 else float("nan") cond = float(smax / smin) if s.size > 0 and smin > 0 else float("inf") print( f"[PseudoTransient][Singular] svd sigma_max={smax:.3e}, sigma_min={smin:.3e}, cond~={cond:.3e}", file=sys.stderr, ) if s.size > 0: cut = max(1e-12, 1e-10 * smax) singular_dirs = np.where(s < cut)[0] if singular_dirs.size > 0: i = int(singular_dirs[0]) v = vh.T[:, i] dom = np.argsort(np.abs(v))[::-1][:8] dom_vars = [f"{self._var_index_name(int(j))}: {v[int(j)]:+.3e}" for j in dom] print( f"[PseudoTransient][Singular] dominant variables in null-like direction: {dom_vars}", file=sys.stderr, ) # Quantify residual component aligned with left null-like direction. # For SVD J = U S V^T, near-null row-space direction is u_i. if rhs.size == J.shape[0]: u_i = dense @ v n_u = float(np.linalg.norm(u_i)) if n_u > 0 and np.all(np.isfinite(u_i)) and np.all(np.isfinite(rhs)): u_i = u_i / n_u rhs_norm = float(np.linalg.norm(rhs)) coeff = float(np.dot(u_i, rhs)) rhs_along = abs(coeff) rhs_orth = float(np.sqrt(max(rhs_norm * rhs_norm - rhs_along * rhs_along, 0.0))) frac = rhs_along / max(rhs_norm, 1e-30) print( "[PseudoTransient][Singular] rhs projection: " f"|rhs|_2={rhs_norm:.3e}, |along_null_left|={rhs_along:.3e}, " f"|orthogonal|={rhs_orth:.3e}, along_frac={frac:.3e}", file=sys.stderr, ) except Exception as e: print(f"[PseudoTransient][Singular] diagnostics failed: {e}", file=sys.stderr) def _rhs_implicit(self, x: Vec, dx: Vec, xn: Vec, h: float) -> Vec: """ Return 𝑑x/dt given the current *state* vector. :param x: get the right-hand-side give a state vector :param dx: :param xn: :return f_state_update or f_algeb """ f_algeb = self.problem.rhs_algebraic(x, dx) if self.problem.get_states_number() > 0: f_state = self.problem.rhs_state(x, dx) f_state_update = x[:self.problem.get_states_number()] - xn[:self.problem.get_states_number()] - h * f_state return np.r_[f_state_update, f_algeb] else: return f_algeb def _jacobian_implicit(self, x: Vec, dx: Vec, h: float) -> sp.csc_matrix: """ :param x: vector or variables' values :param dx: vector of diff values :param h: step :return: """ """ state Var algeb var state eq |I - h * J11 | - h* J12 | | βˆ† state var| | βˆ† state eq | | | | | | | | -------------------------- x |------------| = |------------| algeb eq |J21 | J22 | | βˆ† algeb var| | βˆ† algeb eq | | | | | | | | """ # returns only j22 if no states, returns J if states if self.problem.get_states_number() == 0: j22: sp.csc_matrix = self.problem.get_j22(x, dx, h) return j22 j11_val: csc_matrix = self.problem.get_j11(x, dx, h) j12_val: csc_matrix = self.problem.get_j12(x, dx, h) j21_val: csc_matrix = self.problem.get_j21(x, dx, h) j22_val: csc_matrix = self.problem.get_j22(x, dx, h) I = sp.eye(m=self.problem.get_states_number(), n=self.problem.get_states_number()) j11: sp.csc_matrix = (I - h * j11_val).tocsc() j12: sp.csc_matrix = - h * j12_val j21: sp.csc_matrix = j21_val j22: sp.csc_matrix = j22_val J = pack_4_by_4_scipy(j11, j12, j21, j22) return J def _jacobian_pseudo_transient(self, x: Vec, dx: Vec, h: float) -> sp.csc_matrix: """ Jacobian for pseudo-transient residual: r_state = (x - x_prev) / h - f(x, y) r_alg = g(x, y) therefore: J11 = (1/h) I - d f / d x J12 = - d f / d y J21 = d g / d x J22 = d g / d y """ n_states = self.problem.get_states_number() if n_states == 0: return self.problem.get_j22(x, dx, h) # For initialization problems with numerical Jacobian, compute once and split. # This avoids recomputing 4 separate finite-difference Jacobians and mixing them. jac_full_fn = getattr(self.problem, "_compute_numerical_jacobian", None) if callable(jac_full_fn): j_full = jac_full_fn(x, dx, h).tocsc() n_total = j_full.shape[0] n_alg = n_total - n_states I = sp.eye(m=n_states, n=n_states, format="csc") j11 = ((1.0 / h) * I - j_full[:n_states, :n_states]).tocsc() j12 = (-j_full[:n_states, n_states:n_states + n_alg]).tocsc() j21 = j_full[n_states:n_states + n_alg, :n_states].tocsc() j22 = j_full[n_states:n_states + n_alg, n_states:n_states + n_alg].tocsc() return pack_4_by_4_scipy(j11, j12, j21, j22) j11_val: csc_matrix = self.problem.get_j11(x, dx, h) j12_val: csc_matrix = self.problem.get_j12(x, dx, h) j21_val: csc_matrix = self.problem.get_j21(x, dx, h) j22_val: csc_matrix = self.problem.get_j22(x, dx, h) I = sp.eye(m=n_states, n=n_states, format="csc") j11 = ((1.0 / h) * I - j11_val).tocsc() j12 = (-j12_val).tocsc() j21 = j21_val.tocsc() j22 = j22_val.tocsc() return pack_4_by_4_scipy(j11, j12, j21, j22) def _rhs_steady(self, x: Vec, dx: Vec) -> Vec: f_algeb = self.problem.rhs_algebraic(x, dx) if self.problem.get_states_number() > 0: f_state = self.problem.rhs_state(x, dx) return np.r_[f_state, f_algeb] return f_algeb def _jacobian_steady(self, x: Vec, dx: Vec) -> sp.csc_matrix: jac_full_fn = getattr(self.problem, "_compute_numerical_jacobian", None) if callable(jac_full_fn): return jac_full_fn(x, dx, self.h).tocsc() if self.problem.get_states_number() == 0: return self.problem.get_j22(x, dx, self.h) j11 = self.problem.get_j11(x, dx, self.h).tocsc() j12 = self.problem.get_j12(x, dx, self.h).tocsc() j21 = self.problem.get_j21(x, dx, self.h).tocsc() j22 = self.problem.get_j22(x, dx, self.h).tocsc() return pack_4_by_4_scipy(j11, j12, j21, j22) def _solve_linear_system(self, J: sp.csc_matrix, rhs: Vec, x: Vec, context: str) -> np.ndarray: m = int(J.shape[0]) n = int(J.shape[1]) if m == 0 or n == 0: return np.array([], dtype=float) def solve_lsqr(reason: str) -> np.ndarray: if not self.allow_lsqr_fallback: raise RuntimeError(f"{context}: singular linear system in pseudo-transient ({reason})") self._dbg(f"{context}: {reason}; trying lsqr fallback") delta_lsqr, *_ = spla.lsqr( J, -rhs, damp=1e-8, atol=1e-10, btol=1e-10, iter_lim=max(200, 2 * max(m, n)), ) if np.all(np.isfinite(delta_lsqr)): self._dbg(f"{context}: lsqr fallback succeeded") return np.asarray(delta_lsqr, dtype=float) raise RuntimeError(f"{context}: lsqr fallback failed after {reason}") # Rectangular systems cannot be solved by spsolve; use least-squares. if m != n: return solve_lsqr(f"rectangular Jacobian {J.shape}") with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Matrix is exactly singular") try: delta = spla.spsolve(J, -rhs) except Exception as exc: self._report_singularity_diagnostics(J, rhs, x=x, context=context) try: return solve_lsqr(f"spsolve failed ({exc})") except RuntimeError as fallback_exc: raise fallback_exc from exc if not np.all(np.isfinite(delta)): self._report_singularity_diagnostics(J, rhs, x=x, context=context) return solve_lsqr("non-finite spsolve result") return np.asarray(delta, dtype=float) def _newton_polish(self, x: Vec, tol: float, max_iter: int = 8) -> tuple[Vec, float]: dx = np.zeros(self.problem.get_diff_var_number(), dtype=float) for k in range(max_iter): rhs = self._rhs_steady(x, dx) if not np.all(np.isfinite(rhs)): self._dbg(f"newton polish abort: non-finite rhs at iter={k + 1}") break res_inf = float(np.linalg.norm(rhs, np.inf)) if rhs.size > 0 else 0.0 if res_inf <= tol: self._dbg(f"newton polish converged at iter={k + 1}, residual_inf={res_inf:.3e}") return x, res_inf J = self._jacobian_steady(x, dx) delta = self._solve_linear_system(J, rhs, x=x, context=f"newton_polish iter={k + 1}") solved = np.all(np.isfinite(delta)) if not solved: self._dbg(f"newton polish abort: linear solve failed at iter={k + 1}") break trial_scales = (1.0) best_x = x best_res = np.inf for scale in trial_scales: xt = x + scale * delta rt = self._rhs_steady(xt, dx) if not np.all(np.isfinite(rt)): continue rinf = float(np.linalg.norm(rt, np.inf)) if rt.size > 0 else 0.0 if rinf < best_res: best_res = rinf best_x = xt if rinf <= res_inf * (1.0 - 1e-4 * scale): break x = best_x self._dbg( f"newton polish iter={k + 1}: residual_inf={res_inf:.3e}->{best_res:.3e}, " f"|delta|_2={np.linalg.norm(delta):.3e}" ) rhs_final = self._rhs_steady(x, dx) final_inf = float(np.linalg.norm(rhs_final, np.inf)) if rhs_final.size > 0 and np.all(np.isfinite(rhs_final)) else np.inf return x, final_inf def _rhs_pseudo_transient(self, x: Vec, xn: Vec, dx: Vec, h: float) -> Vec: """ Return 𝑑x/dt given the current *state* vector. :param x: get the right-hand-side give a state vector :param xn: :param h: simulation step :return [f_state_update, f_algeb] """ f_algeb = self.problem.rhs_algebraic(x, dx) if self.problem.get_states_number() > 0: f_state = self.problem.rhs_state(x, dx) f_state_update = (x[:self.problem.get_states_number()] - xn[:self.problem.get_states_number()]) / h - f_state return np.r_[f_state_update, f_algeb] else: return f_algeb def _plot_diagnostics(self, dtau_hist: list[float], dx_error_hist: list[float], residual_hist: list[float], x_hist: list[np.ndarray], state_eq_hist: list[np.ndarray]) -> None: fig, axs = plt.subplots(3, 1, figsize=(8, 10), sharex=True) if len(dx_error_hist) > 0: axs[0].semilogy(dx_error_hist, label="||dx||") axs[0].set_ylabel("dx error (log)") axs[0].legend() if len(residual_hist) > 0: axs[1].semilogy(residual_hist, label="Residual norm") axs[1].set_ylabel("Residual (log)") axs[1].legend() if len(dtau_hist) > 0: axs[2].semilogy(dtau_hist, label="dtau") axs[2].set_ylabel("dtau") axs[2].set_xlabel("Step index") axs[2].legend() x_hist_arr = np.array(x_hist) if x_hist_arr.size == 0 or x_hist_arr.ndim < 2: return state_vars = self._problem_state_vars() n_state_vars = len(state_vars) algeb_vars = self._problem_algebraic_vars() n_algeb_vars = len(algeb_vars) vars_per_plot = 5 if n_state_vars > 0: nplots_state = (n_state_vars + vars_per_plot - 1) // vars_per_plot fig_state, axs_state = plt.subplots(nplots_state, 1, figsize=(10, 2.5 * nplots_state), sharex=True) if nplots_state == 1: axs_state = [axs_state] for i in range(nplots_state): start = i * vars_per_plot end = min((i + 1) * vars_per_plot, n_state_vars) for var in state_vars[start:end]: axs_state[i].plot(x_hist_arr[:, self.problem.uid2idx_vars[var.uid]], label=var.name) axs_state[i].set_ylabel("Value") axs_state[i].legend(loc="best", fontsize="x-small", ncol=2, frameon=False) axs_state[-1].set_xlabel("Step index") if n_algeb_vars > 0: nplots_algeb = (n_algeb_vars + vars_per_plot - 1) // vars_per_plot fig_algeb, axs_algeb = plt.subplots(nplots_algeb, 1, figsize=(10, 2.5 * nplots_algeb), sharex=True) if nplots_algeb == 1: axs_algeb = [axs_algeb] for i in range(nplots_algeb): start = i * vars_per_plot end = min((i + 1) * vars_per_plot, n_algeb_vars) for var in algeb_vars[start:end]: axs_algeb[i].plot(x_hist_arr[:, self.problem.uid2idx_vars[var.uid]], label=var.name) axs_algeb[i].set_ylabel("Value") axs_algeb[i].legend(loc="best", fontsize="x-small", ncol=2, frameon=False) axs_algeb[-1].set_xlabel("Step index") state_eq_hist_arr = np.array(state_eq_hist) n_state_eqs = int(self.problem.get_states_number()) if n_state_eqs > 0 and state_eq_hist_arr.size > 0 and state_eq_hist_arr.ndim == 2: nplots_state_eq = (n_state_eqs + vars_per_plot - 1) // vars_per_plot fig_state_eq, axs_state_eq = plt.subplots(nplots_state_eq, 1, figsize=(10, 2.5 * nplots_state_eq), sharex=True) if nplots_state_eq == 1: axs_state_eq = [axs_state_eq] state_vars = self._problem_state_vars() labels = list() for i in range(n_state_eqs): if i < len(state_vars): labels.append(f"state_update({state_vars[i].name})") else: labels.append(f"state_update[{i}]") for i in range(nplots_state_eq): start = i * vars_per_plot end = min((i + 1) * vars_per_plot, n_state_eqs) for j in range(start, end): axs_state_eq[i].plot(state_eq_hist_arr[:, j], label=labels[j]) axs_state_eq[i].axhline(0.0, color="k", linewidth=0.8, alpha=0.4) axs_state_eq[i].set_ylabel("Eq value") axs_state_eq[i].legend(loc="best", fontsize="x-small", ncol=2, frameon=False) axs_state_eq[-1].set_xlabel("Step index") plt.show() def _report_rhs_offenders(self, x: Vec, xn: Vec, dx: Vec, dtau: float, top_n: int = 20) -> None: rhs = self._rhs_pseudo_transient(x, xn, 0*dx, dtau) if rhs.size == 0: print("[PseudoTransient] Final RHS offenders: none (empty RHS)") return if not np.all(np.isfinite(rhs)): bad_idx = np.where(~np.isfinite(rhs))[0] print(f"[PseudoTransient] Final RHS has NaN/Inf at indices: {bad_idx.tolist()}") return threshold = max(self.tol, 1e-12) abs_rhs = np.abs(rhs) offenders = np.where(abs_rhs > threshold)[0] if offenders.size == 0: print(f"[PseudoTransient] Final RHS offenders: none above threshold={threshold:.3e}") return ranked = offenders[np.argsort(-abs_rhs[offenders])] n_show = min(int(top_n), ranked.size) print(f"[PseudoTransient] Final RHS offenders (top {n_show}, threshold={threshold:.3e}):") for i in ranked[:n_show]: eq_repr = self._rhs_index_equation_repr(int(i)) print(f" - rhs[{int(i)}] = {rhs[int(i)]:+.6e} | {eq_repr}")
[docs] def simulate(self, plot=True, x0: Vec | None = None): original_fixed_indices = list(self._fixed_var_indices) n_vars = self.problem.get_all_vars_number() if x0 is None: get_x0 = getattr(self.problem, "get_x0", None) if callable(get_x0): x0 = np.array(get_x0(), dtype=float, copy=True) else: x0 = np.random.rand(n_vars, dtype=float) else: x0 = np.array(x0, dtype=float, copy=True) if x0.size != n_vars: raise ValueError(f"Invalid x0 size for pseudo-transient: got {x0.size}, expected {n_vars}") #x0 = self._find_initial_diff_free_feasible_point(x0, max_iter=50) #x0 = self._predictor_project_initial_guess(x0) #x0 = self._apply_fixed_mask(x0, x0) n_x0_tries = int(os.getenv("PSEUDO_X0_TRIES", "1")) seed_text = os.getenv("PSEUDO_X0_SEED", "") rng = np.random.default_rng(None if seed_text.strip() == "" else int(seed_text)) if n_x0_tries <= 1: x0 = np.random.rand(n_vars) else: best_x0 = None best_res = np.inf dx_probe = np.zeros(self.problem.get_diff_var_number(), dtype=float) chosen_idx = -1 for k in range(n_x0_tries): x_try = rng.random(n_vars) x_try = self._apply_fixed_mask(x_try, x_try) rhs_try = self._rhs_pseudo_transient(x_try, x_try, dx_probe, self.dtau0) if not np.all(np.isfinite(rhs_try)): continue res_try = float(np.linalg.norm(rhs_try, np.inf)) if rhs_try.size > 0 else 0.0 J_try = self._jacobian_pseudo_transient(x_try, dx_probe, self.dtau0) square_ok = J_try.shape[0] == J_try.shape[1] solvable = False if square_ok: with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Matrix is exactly singular") try: d_try = spla.spsolve(J_try, -rhs_try) solvable = np.all(np.isfinite(d_try)) except Exception: solvable = False if res_try < best_res: best_res = res_try best_x0 = x_try if solvable: chosen_idx = k x0 = x_try break if chosen_idx >= 0: self._dbg(f"x0 multi-try selected candidate={chosen_idx + 1}/{n_x0_tries}") else: x0 = best_x0 if best_x0 is not None else rng.random(n_vars) self._dbg(f"x0 multi-try found no solvable Jacobian; using best residual candidate with ||rhs||_inf={best_res:.3e}") if len(self._fixed_var_indices) > 0: fixed_preview = [ f"{self._var_index_name(i)}={x0[i]:+.6e}" for i in self._fixed_var_indices[:8] if 0 <= i < x0.size ] self._dbg(f"fixed mask active: n_fixed={len(self._fixed_var_indices)}, x0={fixed_preview}") self._dbg( f"start: n_vars={n_vars}, n_states={self.problem.get_states_number()}, " f"n_algebraic={self.problem.get_algebraic_var_number()}, n_diff={self.problem.get_diff_var_number()}, " f"dtau0={self.dtau0:.3e}, tol={self.tol:.3e}, max_iter={self.max_iter_0}" ) dtau = self.dtau0 dtau_max = self.dtau_max dtau_min = self.dtau_min dx0 = np.zeros(self.problem.get_diff_var_number()) y = np.tile(x0, (5, 1)) step_idx = 0 x_new = x0.copy() xn = x0.copy() x_fixed_ref = x0.copy() released_fixed_refs = False tries = 0 # Update variable parameters at t=0 self.problem.update_variable_params(0.0) dx_error = 1 residual = 10 old_residual = 10 good_step_streak = 0 dtau_stall_streak = 0 rhs_weights: np.ndarray | None = None # history containers dtau_hist = list() dx_error_hist = list() residual_hist = list() x_hist = list() dx_hist = list() state_eq_hist = list() try: while step_idx < self.max_iter_0: tries += 1 solved = False run_newton_raphson = (tries % 10 == 0) or (residual > 10 * self.tol and tries > 1) if step_idx == 0: xlast = xn xn = x_new.copy() dx = np.zeros(self.problem.get_diff_var_number()) else: dx = self.problem.get_dx(xn, xlast, dx0, dtau) # or 1e-3 and fixed? try: xlast = y[-2].copy() except: xlast = xn xn = y[-1].copy() rhs = self._rhs_pseudo_transient(x_new, xn, dx, dtau) rhs2 = self._rhs_pseudo_transient(x_new, xn, dx, dtau) if rhs_weights is None or rhs_weights.size != rhs.size: rhs_weights = self._build_residual_weights(rhs.size) if self.use_weighted_residual: self._dbg( f"weighted residual enabled: network={self.weight_network:.3g}, " f"power={self.weight_power:.3g}" ) residual2 = self._weighted_norm(rhs2, rhs_weights) if not np.all(np.isfinite(rhs)): bad_idx = np.where(~np.isfinite(rhs))[0] bad_vals = rhs[bad_idx] if bad_idx.size > 0 else np.array([]) bad_eqs = [self._rhs_index_equation_repr(int(i)) for i in bad_idx] bad_eq_vars = [self._rhs_index_equation_vars_debug(int(i), x_new) for i in bad_idx] raise ValueError( "NaN or Inf in RHS" f" (step={step_idx}, try={tries}, dtau={dtau:.3e}, " f"bad_idx={bad_idx.tolist()}, bad_vals={bad_vals.tolist()}, " f"bad_eqs={bad_eqs}, bad_eq_vars={bad_eq_vars})" ) Jf = self._jacobian_pseudo_transient(x_new, dx, dtau) residual = self._weighted_norm(rhs, rhs_weights) # Structural guard: ensure linear solve dimensions match state vector. if Jf.shape[0] != rhs.size or Jf.shape[1] != x_new.size: eq_preview = [self._rhs_index_equation_begin(i) for i in range(min(8, rhs.size))] var_preview = [self._var_index_name(i) for i in range(min(8, x_new.size))] unmatched = [] if Jf.shape[1] < x_new.size: unmatched = [self._var_index_name(i) for i in range(Jf.shape[1], x_new.size)] raise ValueError( "PseudoTransient dimension mismatch before linear solve: " f"J={Jf.shape}, rhs={rhs.size}, x={x_new.size}, " f"n_states={self.problem.get_states_number()}, " f"n_algebraic={self.problem.get_algebraic_var_number()}, " f"n_vars={self.problem.get_all_vars_number()}, " f"eq_preview={eq_preview}, var_preview={var_preview}, " f"unmatched_var_tail={unmatched}" ) delta = self._solve_linear_system(Jf, rhs, x=x_new, context=f"pseudo step={step_idx + 1} try={tries}") if delta.size != x_new.size: raise ValueError( "PseudoTransient linear step size mismatch: " f"delta={delta.size}, x={x_new.size}, J={Jf.shape}, rhs={rhs.size}" ) n_states = int(self.problem.get_states_number()) g = rhs[n_states:] Jg = Jf[n_states:, :] alg_lin_res = Jg @ delta + g print( "[PseudoTransient diagnostics] algebraic linear solve check: " f"|g|_inf={np.linalg.norm(g, np.inf):.6e}, " f"|Jg*delta+g|_inf={np.linalg.norm(alg_lin_res, np.inf):.6e}" ) solved = np.all(np.isfinite(delta)) if not solved: # or not np.all(np.isfinite(delta)): if self.verbose: print(f'jacobian is {Jf.toarray()}') print(f'delta is {delta}') print(f'x_new is {x_new}') print(f'rhs is {rhs}') print(f'residual is {np.linalg.norm(rhs)} try is {tries} and step is {step_idx}') raise ValueError( f"Newton step failed at try {tries} and step {step_idx}: delta has NaN/Inf values with dtau {dtau}") dx0 = dx base_residual = residual trial_scales = (1.0, 0.5, 0.25, 0.1, 0.05) best_scale = 0.0 best_x = None best_residual = np.inf best_rhs = None for scale in trial_scales: x_trial = x_new + scale * delta x_trial = self._apply_fixed_mask(x_trial, x_fixed_ref) rhs_trial = self._rhs_pseudo_transient(x_trial, xn, dx, dtau) if not np.all(np.isfinite(rhs_trial)): continue residual_trial = self._weighted_norm(rhs_trial, rhs_weights) if residual_trial < best_residual: best_residual = residual_trial best_scale = scale best_x = x_trial best_rhs = rhs_trial if residual_trial <= base_residual * (1.0 - 1e-4 * scale): break if best_x is None: raise ValueError( f"Line-search failed at try {tries} and step {step_idx}: no finite trial residual" ) x_new = self._apply_fixed_mask(best_x, x_fixed_ref) self._check_fixed_drift(x_new, x_fixed_ref, where=f"step={step_idx + 1} try={tries} accepted") rhs = best_rhs residual = best_residual if best_scale < 1.0: self._dbg( f"step damping: scale={best_scale:.2f}, residual2={base_residual:.3e}->{best_residual:.3e}" ) newton_residual = np.linalg.norm(rhs, np.inf) self._dbg( f"step={step_idx + 1} try={tries}: residual2={residual:.3e}, " f"residual_inf={newton_residual:.3e}, |delta|_2={np.linalg.norm(delta):.3e}, dtau={dtau:.3e}" ) if self.verbose: self._report_key_equation_tracking(rhs, x_new, xn, tag=f"step={step_idx + 1} try={tries}") if ( (not released_fixed_refs) and len(self._fixed_var_indices) > 0 and np.isfinite(newton_residual) and newton_residual < self.reference_error_tol ): released_fixed_refs = True self._dbg( f"releasing fixed reference mask at step={step_idx + 1} " f"(residual_inf={newton_residual:.3e} < {self.reference_error_tol:.3e})" ) self._fixed_var_indices = [] if step_idx == 0 and tries % 10 == 1: i = np.argmax(rhs) if solved: step_idx += 1 tries = 0 y = np.roll(y, shift=-1, axis=0) alpha = 1.0 if step_idx > 2: y[-1] = alpha * x_new + (1 - alpha) * y[-1] else: y[-1] = x_new x_new = self._apply_fixed_mask(y[-1], x_fixed_ref) self._check_fixed_drift(x_new, x_fixed_ref, where=f"step={step_idx} rollout") xn = x_new.copy() dx = self.problem.get_dx(xn, xlast, dx, dtau) dx_error = np.linalg.norm(dx) rhs = self._rhs_pseudo_transient(x_new, xn, dx, dtau) residual = self._weighted_norm(rhs, rhs_weights) # save history dtau_hist.append(dtau) dx_error_hist.append(dx_error) residual_hist.append(residual) x_hist.append(x_new.copy()) dx_hist.append(dx.copy()) n_states = int(self.problem.get_states_number()) if n_states > 0: state_eq_hist.append(rhs[:n_states].copy()) if residual < self.tol: break self._dbg( f"accepted step={step_idx}: residual2={residual:.3e}, dx_error={dx_error:.3e}, dtau={dtau:.3e}" ) eps = 1e-14 avg_residual = 0.8 * old_residual + 0.2 * residual ratio = (avg_residual + eps) / (residual + eps) # Hysteresis prevents 2x/0.5x ping-pong when ratio hovers near 1. if ratio > 1.20: beta = 1.35 elif ratio > 1.05: beta = 1.15 elif ratio < 0.80: beta = 0.70 elif ratio < 0.95: beta = 0.87 else: beta = 1.0 # If progress is consistently good, force a gentle dtau ramp-up. # This avoids stalling when ratio remains near 1 due to smoothing. good_step = (residual < 0.995 * old_residual) and (best_scale >= 0.25) if good_step: good_step_streak += 1 else: good_step_streak = 0 if good_step_streak >= 3 and residual > self.tol: beta = max(beta, 1.20) self._dbg( f"adaptive dtau boost: streak={good_step_streak}, " f"best_scale={best_scale:.2f}, forced_beta={beta:.3e}" ) good_step_streak = 0 self._dbg(f"adaptive dtau: old={dtau:.3e}, beta={beta:.3e}") dtau_prev = dtau if dtau > 0: dtau = min(dtau_max, max(dtau_min, dtau * beta)) else: dtau = -min(dtau_max, max(dtau_min, -dtau * beta)) if np.isclose(dtau, dtau_prev, rtol=1e-12, atol=1e-15): dtau_stall_streak += 1 else: dtau_stall_streak = 0 abs_dtau_prev = abs(dtau_prev) if ( dtau_stall_streak >= 3 and abs_dtau_prev < 0.999 * dtau_max and residual > self.tol and best_scale >= 0.25 ): forced = min(dtau_max, max(dtau_min, abs_dtau_prev * 1.25)) dtau = forced if dtau_prev >= 0 else -forced self._dbg( f"adaptive dtau anti-stall: streak={dtau_stall_streak}, " f"best_scale={best_scale:.2f}, forced_dtau={dtau:.3e}" ) dtau_stall_streak = 0 if abs(dtau) >= 0.999 * dtau_max: self._dbg(f"adaptive dtau capped at dtau_max={dtau_max:.3e}") self._dbg(f"adaptive dtau: new={dtau:.3e}") old_residual = residual elif tries > self.max_iter_0: if self.verbose: print(f'delta is {delta}') print(f'failed with dtau = {dtau}') self._dbg( f"max tries reached at step={step_idx}, dtau={dtau:.2e}, residual2={residual:.2e}; " f"ending pseudo-transient loop" ) break except Exception: if plot: self._plot_diagnostics(dtau_hist, dx_error_hist, residual_hist, x_hist, state_eq_hist) raise finally: self._fixed_var_indices = original_fixed_indices self._dbg( f"finish: steps={step_idx}, final_residual2={residual:.3e}, final_dtau={dtau:.3e}, " f"x_inf={np.linalg.norm(x_new, np.inf):.3e}" ) self._report_rhs_offenders(x=x_new, xn=xn, dx=dx, dtau=dtau) init_guess = dict() for var in self._problem_state_vars() + self._problem_algebraic_vars(): if var.uid not in self.problem.uid2idx_vars: continue idx = self.problem.uid2idx_vars[var.uid] if 0 <= idx < x_new.size: init_guess[var] = x_new[idx] if plot: self._plot_diagnostics(dtau_hist, dx_error_hist, residual_hist, x_hist, state_eq_hist) return x_new, init_guess