# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
import logging
import warnings
from typing import Any, Callable, Optional
import numpy as np
try:
import scipy.sparse as sp
import scipy.sparse.linalg as spla
import scipy.linalg as sla
from scipy.linalg import LinAlgWarning
from scipy.sparse.linalg import MatrixRankWarning
except ImportError: # pragma: no cover
sp = None
spla = None
sla = None
LinAlgWarning = Warning
MatrixRankWarning = Warning
Array1D = np.ndarray
'''
Per-solve context containing state, metrics, and diagnostics for the Newton solver.
---
DEV NOTE (Architecture & C++ Porting):
1. PYTHON IMPLEMENTATION:
We use @dataclass(slots=True) here to optimize memory footprint (eliminating
internal `__dict__` per instance) and strictly define the schema. This is critical
when generating thousands of context objects during simulation steps.
2. C++ MIGRATION GUIDE:
If porting this structure to C++, treat it as a POD `struct`.
- Nullables: `Optional[float]` must be converted to `std::optional<double>`.
- Logging: You will lose the auto-generated `__repr__`. You MUST manually
implement `friend std::ostream& operator <<` to support logging.
- Equality: Implement `operator==` (or use C++20 `default`) to match Python's behavior.
- Initialization: Use C++20 designated initializers (`.field = val`) to mimic
Python's keyword arguments.
---
'''
[docs]
class NewtonDiagnosticsConfig:
"""
Configuration for Newton linear-solve diagnostics.
Attributes:
step_norm_explode: Threshold on ||dx||_2 to flag an exploding Newton step.
dense_cond_warn: Condition number threshold to warn about ill-conditioning (dense only).
compute_dense_cond: If True, compute np.linalg.cond(A) for dense matrices.
dense_cond_max_n: Avoid cond(A) if matrix dimension > this value (O(n^3) cost).
enable_fallback: If True, attempt a least-squares fallback when the primary solve fails.
enable_index1_check: If True, validate the algebraic Jacobian block on refresh.
index1_max_block_n: Skip explicit index-1 checks for algebraic blocks larger than this size.
index1_warn_pivot_ratio: Warn threshold for the minimum pivot ratio of the algebraic block.
index1_fail_pivot_ratio: Failure threshold for the minimum pivot ratio of the algebraic block.
enable_backtracking: If True, apply backtracking when a full Newton step fails to reduce the residual.
backtracking_beta: Shrink factor used during backtracking.
backtracking_min_alpha: Minimum accepted Newton step scaling.
backtracking_max_iter: Maximum number of backtracking reductions.
log_level: Logging level for warnings.
"""
__slots__ = (
"step_norm_explode",
"dense_cond_warn",
"compute_dense_cond",
"dense_cond_max_n",
"enable_fallback",
"enable_index1_check",
"index1_max_block_n",
"index1_warn_pivot_ratio",
"index1_fail_pivot_ratio",
"enable_backtracking",
"backtracking_beta",
"backtracking_min_alpha",
"backtracking_max_iter",
"log_level",
)
def __init__(
self,
step_norm_explode: float = 1e6,
dense_cond_warn: float = 1e12,
compute_dense_cond: bool = True,
dense_cond_max_n: int = 600,
enable_fallback: bool = True,
enable_index1_check: bool = False,
index1_max_block_n: int = 128,
index1_warn_pivot_ratio: float = 1e-10,
index1_fail_pivot_ratio: float = 1e-14,
enable_backtracking: bool = False,
backtracking_beta: float = 0.5,
backtracking_min_alpha: float = 1e-4,
backtracking_max_iter: int = 6,
log_level: int = logging.WARNING,
) -> None:
self.step_norm_explode = step_norm_explode
self.dense_cond_warn = dense_cond_warn
self.compute_dense_cond = compute_dense_cond
self.dense_cond_max_n = dense_cond_max_n
self.enable_fallback = enable_fallback
self.enable_index1_check = enable_index1_check
self.index1_max_block_n = index1_max_block_n
self.index1_warn_pivot_ratio = index1_warn_pivot_ratio
self.index1_fail_pivot_ratio = index1_fail_pivot_ratio
self.enable_backtracking = enable_backtracking
self.backtracking_beta = backtracking_beta
self.backtracking_min_alpha = backtracking_min_alpha
self.backtracking_max_iter = backtracking_max_iter
self.log_level = log_level
[docs]
class NewtonSolveContext:
"""
Per-solve context, passed by the caller.
The decorator updates diagnostic fields (used_fallback, cond_est, step_norm, failure_msg).
Attributes:
t: Current simulation time.
step_idx: Time-step index.
newton_iter: Newton iteration index within the time-step.
phase: Text label (e.g. "jit", "implicit", "pseudo", "init").
method: Integration method label, if available.
solver: Text label (e.g. "dense", "sparse").
used_fallback: Set True if fallback was used.
cond_est: Estimated condition number (dense only, optional).
step_norm2: ||dx||_2
step_norm_inf: ||dx||_inf
failure_msg: If the primary solver failed, store a short error description.
"""
__slots__ = (
"t",
"step_idx",
"newton_iter",
"phase",
"method",
"solver",
"used_fallback",
"cond_est",
"step_norm2",
"step_norm_inf",
"failure_msg",
"res_norm_inf",
"index1_pivot_ratio",
)
def __init__(
self,
t: float,
step_idx: int,
newton_iter: int,
phase: str,
method: str = "",
solver: str = "",
used_fallback: bool = False,
cond_est: Optional[float] = None,
step_norm2: Optional[float] = None,
step_norm_inf: Optional[float] = None,
failure_msg: Optional[str] = None,
res_norm_inf: Optional[float] = None,
index1_pivot_ratio: Optional[float] = None,
) -> None:
self.t = t
self.step_idx = step_idx
self.newton_iter = newton_iter
self.phase = phase
self.method = method
self.solver = solver
self.used_fallback = used_fallback
self.cond_est = cond_est
self.step_norm2 = step_norm2
self.step_norm_inf = step_norm_inf
self.failure_msg = failure_msg
self.res_norm_inf = res_norm_inf
self.index1_pivot_ratio = index1_pivot_ratio
def _emit(logger: Optional[logging.Logger], level: int, msg: str) -> None:
"""
Emit a message either via logger (if configured) or via print as a fallback.
"""
if logger is not None and (logger.handlers or logging.getLogger().handlers):
logger.log(level, msg)
else:
# Console fallback if no logging handlers exist.
print(msg)
def _format_ctx(ctx: NewtonSolveContext) -> str:
return (
f"t={ctx.t:.6g}, step={ctx.step_idx}, it={ctx.newton_iter}, "
f"phase={ctx.phase}, method={ctx.method}, solver={ctx.solver}"
)
[docs]
def with_newton_diagnostics(
primary_solve: Callable[[Any, Array1D], Array1D],
*,
fallback_solve: Optional[Callable[[Any, Array1D], Array1D]] = None,
collector: Optional["NewtonTraceCollector"] = None,
config: Optional[NewtonDiagnosticsConfig] = None,
logger: Optional[logging.Logger] = None,
solver_name: str = "",
matrix_getter: Optional[Callable[[Any], Any]] = None,
) -> Callable[[Any, Array1D, NewtonSolveContext], Array1D]:
"""
Decorate a linear solver with Jacobian conditioning diagnostics and fallback LS solve.
Args:
primary_solve: Function implementing the primary solve (e.g. np.linalg.solve, spla.spsolve).
fallback_solve: Least-squares fallback (e.g. np.linalg.lstsq(...)[0], spla.lsqr(...)[0]).
collector: Optional Newton trace collector receiving per-iteration records.
config: Diagnostics configuration.
logger: Optional logger (uses print fallback if not configured).
solver_name: Human-readable solver label.
matrix_getter: Optional callback to extract the diagnostic matrix from a bundled solver object.
Returns:
A callable solve(A, b, ctx) -> x, which updates ctx with diagnostics info.
"""
cfg = config or NewtonDiagnosticsConfig()
log = logger or logging.getLogger(__name__)
def wrapped(A: Any, b: Array1D, ctx: NewtonSolveContext) -> Array1D:
ctx.solver = solver_name or ctx.solver
diag_matrix: Any = matrix_getter(A) if matrix_getter is not None else A
# ---- Optional dense conditioning estimate ----
# Only do this for dense ndarray matrices to avoid sparse->dense blowups.
if cfg.compute_dense_cond and isinstance(diag_matrix, np.ndarray):
n = diag_matrix.shape[0]
if n <= cfg.dense_cond_max_n:
try:
cond = float(np.linalg.cond(diag_matrix))
ctx.cond_est = cond
if cond > cfg.dense_cond_warn:
_emit(
log,
cfg.log_level,
f"[NewtonDiag] Ill-conditioned dense Jacobian (cond={cond:.3e}). {_format_ctx(ctx)}",
)
except Exception as e:
_emit(
log,
cfg.log_level,
f"[NewtonDiag] cond(A) failed ({type(e).__name__}: {e}). {_format_ctx(ctx)}",
)
# ---- Primary solve (with rank-warning promotion for sparse) ----
ctx.used_fallback = False
ctx.failure_msg = None
try:
if sp is not None and spla is not None and sp.issparse(diag_matrix):
# SciPy may emit MatrixRankWarning instead of raising; promote it to exception.
with warnings.catch_warnings():
warnings.simplefilter("error", MatrixRankWarning)
x = primary_solve(A, b)
else:
x = primary_solve(A, b)
if not np.all(np.isfinite(x)):
raise FloatingPointError("Primary solve returned NaN/Inf.")
except Exception as e:
ctx.failure_msg = f"{type(e).__name__}: {e}"
_emit(
log,
cfg.log_level,
f"[NewtonDiag] Linear solve failed -> {ctx.failure_msg}. {_format_ctx(ctx)}",
)
if not cfg.enable_fallback:
raise
fb = fallback_solve
if fb is None:
raise # No fallback provided.
ctx.used_fallback = True
_emit(
log,
cfg.log_level,
f"[NewtonDiag] Attempting least-squares fallback. {_format_ctx(ctx)}",
)
x = fb(A, b)
if not np.all(np.isfinite(x)):
raise FloatingPointError("Fallback solve returned NaN/Inf.")
# ---- Step norm monitoring ----
n2 = float(np.linalg.norm(x))
ninf = float(np.linalg.norm(x, np.inf))
ctx.step_norm2 = n2
ctx.step_norm_inf = ninf
if n2 > cfg.step_norm_explode:
_emit(
log,
cfg.log_level,
f"[NewtonDiag] Exploding Newton step (||dx||2={n2:.3e}, ||dx||inf={ninf:.3e}). {_format_ctx(ctx)}",
)
# ---- Trace collection (optional)----
if collector is not None:
collector.record(
ctx=ctx,
res_norm=ctx.res_norm_inf,
dx=x,
cond=ctx.cond_est,
fallback=ctx.used_fallback,
)
return x
return wrapped
[docs]
def maybe_check_index1(
jacobian: Any,
n_state: int,
*,
ctx: Optional[NewtonSolveContext] = None,
config: Optional[NewtonDiagnosticsConfig] = None,
logger: Optional[logging.Logger] = None,
) -> Optional[float]:
"""
Optionally validate the algebraic Jacobian block associated with an index-1 DAE.
The check is intentionally conservative and size-limited so it does not penalize
normal runtime performance. It is only active when ``config.enable_index1_check``
is set to ``True``.
:param jacobian: Dense or sparse Jacobian matrix of the monolithic residual.
:param n_state: Number of differential states; rows/columns after this offset belong to ``g_y``.
:param ctx: Optional Newton context for diagnostics bookkeeping.
:param config: Newton diagnostics configuration.
:param logger: Optional logger.
:return: Estimated pivot-ratio indicator or ``None`` when the check is skipped.
"""
cfg = config or NewtonDiagnosticsConfig()
log = logger or logging.getLogger(__name__)
if not cfg.enable_index1_check:
return None
n_total: int = int(jacobian.shape[0])
n_alg: int = n_total - int(n_state)
if n_alg <= 0 or n_alg > cfg.index1_max_block_n:
return None
if isinstance(jacobian, np.ndarray):
gy = jacobian[n_state:, n_state:]
if gy.size == 0:
return None
if sla is None:
return None
try:
with warnings.catch_warnings():
warnings.simplefilter("error", LinAlgWarning)
lu_data, _ = sla.lu_factor(gy)
diag_u = np.abs(np.diag(lu_data))
scale = max(float(np.max(np.abs(gy))), 1.0)
except Exception as e:
msg = f"[NewtonDiag] index-1 dense check failed ({type(e).__name__}: {e}). {_format_ctx(ctx) if ctx else ''}"
_emit(log, cfg.log_level, msg)
raise RuntimeError(msg) from e
elif sp is not None and spla is not None and sp.issparse(jacobian):
gy = jacobian[n_state:, n_state:].tocsc()
if gy.nnz == 0:
msg = f"[NewtonDiag] Empty algebraic Jacobian block detected. {_format_ctx(ctx) if ctx else ''}"
_emit(log, cfg.log_level, msg)
raise RuntimeError(msg)
try:
with warnings.catch_warnings():
warnings.simplefilter("error", MatrixRankWarning)
lu_obj = spla.splu(gy)
diag_u = np.abs(lu_obj.U.diagonal())
scale = max(float(np.max(np.abs(gy.data))), 1.0)
except Exception as e:
msg = f"[NewtonDiag] index-1 sparse check failed ({type(e).__name__}: {e}). {_format_ctx(ctx) if ctx else ''}"
_emit(log, cfg.log_level, msg)
raise RuntimeError(msg) from e
else:
return None
if diag_u.size == 0:
return None
pivot_ratio: float = float(np.min(diag_u) / scale)
if ctx is not None:
ctx.index1_pivot_ratio = pivot_ratio
if pivot_ratio < cfg.index1_fail_pivot_ratio:
msg = (
f"[NewtonDiag] Likely index-1 violation or near-singular g_y block "
f"(pivot_ratio={pivot_ratio:.3e}). {_format_ctx(ctx) if ctx else ''}"
)
_emit(log, cfg.log_level, msg)
raise RuntimeError(msg)
if pivot_ratio < cfg.index1_warn_pivot_ratio:
_emit(
log,
cfg.log_level,
f"[NewtonDiag] Weak algebraic Jacobian block (pivot_ratio={pivot_ratio:.3e}). {_format_ctx(ctx) if ctx else ''}",
)
return pivot_ratio
[docs]
def maybe_apply_backtracking(
x_iter: Array1D,
delta: Array1D,
res_norm: float,
trial_x: Array1D,
trial_res: Array1D,
*,
evaluate_residual: Callable[[Array1D, Array1D], float],
config: Optional[NewtonDiagnosticsConfig] = None,
) -> None:
"""
Optionally apply backtracking to a Newton step.
The full Newton step is kept as the default behavior. When backtracking is enabled,
the routine tries geometrically reduced step sizes and accepts the first one that
decreases the residual norm. If none succeeds, the original full step is applied.
:param x_iter: Current Newton iterate. Updated in place.
:param delta: Full Newton correction.
:param res_norm: Residual norm at the current iterate.
:param trial_x: Reusable state buffer for trial iterates.
:param trial_res: Reusable residual buffer for trial iterates.
:param evaluate_residual: Callback ``evaluate_residual(trial_x, trial_res) -> res_norm``.
:param config: Newton diagnostics configuration.
:return: None
"""
cfg = config or NewtonDiagnosticsConfig()
if not cfg.enable_backtracking:
x_iter += delta
return
alpha: float = 1.0
bt_iter: int = 0
while bt_iter <= cfg.backtracking_max_iter:
trial_x[:] = x_iter
trial_x += alpha * delta
trial_res_norm: float = float(evaluate_residual(trial_x, trial_res))
if np.isfinite(trial_res_norm) and trial_res_norm < res_norm:
x_iter[:] = trial_x
return
alpha *= cfg.backtracking_beta
if alpha < cfg.backtracking_min_alpha:
break
bt_iter += 1
x_iter += delta
# --------------------------
# Convenience fallback solvers
# --------------------------
[docs]
def dense_lstsq_fallback(A: Any, b: Array1D) -> Array1D:
"""
Dense least-squares fallback using np.linalg.lstsq.
Args:
A: Dense matrix.
b: RHS vector.
Returns:
x: Least-squares solution.
"""
x, *_ = np.linalg.lstsq(A, b, rcond=None)
return x
[docs]
def sparse_lsqr_fallback(A: Any, b: Array1D) -> Array1D:
"""
Sparse least-squares fallback using scipy.sparse.linalg.lsqr.
Args:
A: Sparse matrix.
b: RHS vector.
Returns:
x: LSQR solution.
"""
if spla is None:
raise RuntimeError("SciPy is required for sparse LSQR fallback.")
x = spla.lsqr(A, b)[0]
return x
[docs]
class NewtonTraceCollector:
"""
Collects numerical diagnostics during a simulation run.
Designed for post-analysis and research purposes.
"""
__slots__ = ("records", "residual_records")
def __init__(self) -> None:
"""
Initialize an empty Newton trace collector.
:return: None
"""
self.records: list[dict[str, Any]] = list()
self.residual_records: list[dict[str, Any]] = list()
[docs]
def record(
self,
*,
ctx: NewtonSolveContext,
res_norm: Optional[float] = None,
dx: Optional[Array1D] = None,
cond: Optional[float] = None,
fallback: bool = False,
) -> None:
"""
Append one Newton diagnostics record.
:param ctx: Per-iteration Newton context.
:param res_norm: Residual infinity norm.
:param dx: Newton correction vector.
:param cond: Optional dense Jacobian condition estimate.
:param fallback: Whether the least-squares fallback was used.
:return: None
"""
self.records.append({
"t": ctx.t,
"step": ctx.step_idx,
"newton_iter": ctx.newton_iter,
"phase": ctx.phase,
"method": ctx.method,
"solver": ctx.solver,
"res_norm_inf": res_norm,
"dx_norm_2": np.linalg.norm(dx) if dx is not None else None,
"dx_norm_inf": np.linalg.norm(dx, np.inf) if dx is not None else None,
"cond_J": cond,
"index1_pivot_ratio": ctx.index1_pivot_ratio,
"used_fallback": fallback,
})
[docs]
def to_dataframe(self):
"""
Convert the collected records into a pandas DataFrame.
:return: DataFrame with Newton diagnostics records.
"""
import pandas as pd
return pd.DataFrame(self.records)
[docs]
def record_residual_vector(
self,
*,
ctx: NewtonSolveContext,
residual: Array1D,
top_k: int = 5,
debug_info: Optional[list[dict[str, Any]]] = None,
) -> None:
"""
Record one residual snapshot with top offending equation indices.
:param ctx: Per-iteration Newton context.
:param residual: Full nonlinear residual vector.
:param top_k: Number of largest-magnitude residual entries to keep.
:param debug_info: Optional residual metadata aligned with residual indices.
:return: None.
"""
if residual.size == 0:
return
k = int(max(1, min(int(top_k), int(residual.size))))
idx = np.argpartition(np.abs(residual), -k)[-k:]
idx = idx[np.argsort(np.abs(residual[idx]))[::-1]]
top_rows: list[dict[str, Any]] = list()
for ii in idx:
row: dict[str, Any] = {
"eq_idx": int(ii),
"res": float(residual[ii]),
"abs_res": float(abs(residual[ii])),
}
if debug_info is not None and 0 <= int(ii) < len(debug_info):
info = debug_info[int(ii)]
row["kind"] = str(info.get("kind", ""))
row["label"] = str(info.get("label", ""))
row["var_name"] = str(info.get("var_name", ""))
top_rows.append(row)
self.residual_records.append({
"t": float(ctx.t),
"step": int(ctx.step_idx),
"newton_iter": int(ctx.newton_iter),
"res_norm_inf": float(np.max(np.abs(residual))),
"top": top_rows,
})