# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from typing import Tuple, Any
import numpy as np
import scipy.sparse.linalg as spla
import scipy.linalg as la
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Simulations.EMT.problems.emt_problem_template import EmtProblemTemplate
from VeraGridEngine.Simulations.driver_template import DriverTemplate
from VeraGridEngine.Simulations.EMT.emt_options import EmtOptions
from VeraGridEngine.Simulations.EMT.solvers.StructuralVectorizedSolver import StructuralVectorizedSolver
from VeraGridEngine.Simulations.SmallSignalStabilityEmt.emt_floquet_operator import (EmtFloquetOperator,
BlockEmtFloquetOperator,
AkStackBlockEmtFloquetOperator)
from VeraGridEngine.Simulations.SmallSignalStabilityEmt.small_signal_stability_emt_options import \
EmtSmallSignalStabilityOptions
from VeraGridEngine.Simulations.SmallSignalStabilityEmt.small_signal_stability_emt_results import \
SmallSignalStabilityEmtResults
from VeraGridEngine.Simulations.SmallSignalStabilityEmt.emt_floquet_numba_kernels import NUMBA_AVAILABLE, \
bmgs_twice_numba
from VeraGridEngine.basic_structures import Vec, CxVec, Mat, CxMat
from VeraGridEngine.enumerations import SmallSignalEmtBuildTypes
[docs]
def build_restart_seed(U_sel: CxMat, p_target: int, seed: int = 42) -> Mat:
"""
Builds a real orthonormal restart block from complex Ritz/refined vectors.
Extracts real and imaginary parts to form a strictly real subspace seed.
:param U_sel: Complex matrix of selected Ritz or Refined vectors.
:param p_target: Desired number of columns (block size) for the restart seed.
:param seed: Random seed for generating orthogonal padding if needed.
:return: A real orthonormal block matrix of shape (n_states, p_target).
"""
n = U_sel.shape[0]
cols = []
for j in range(U_sel.shape[1]):
ur = np.real(U_sel[:, j])
if np.linalg.norm(ur) > 1e-14:
cols.append(ur)
if len(cols) >= p_target:
break
ui = np.imag(U_sel[:, j])
if np.linalg.norm(ui) > 1e-14:
cols.append(ui)
if len(cols) >= p_target:
break
rng = np.random.default_rng(seed)
while len(cols) < p_target:
cols.append(rng.standard_normal(n))
S = np.column_stack(cols[:p_target])
Q, _ = np.linalg.qr(S)
return Q[:, :p_target]
[docs]
class RobustBlockArnoldiEngine:
"""
Implements:
1. Block Modified Gram-Schmidt (BMGS).
2. DGKS/Kahan Re-orthogonalization ("Twice is Enough").
3. Rank-Revealing SVD for Block Deflation.
"""
__slots__ = (
"monodromy_op",
"n",
"p_init",
"max_iters",
"tol_deflation",
"V",
"H",
"p_active",
"block_indices",
)
def __init__(self, monodromy_op: BlockEmtFloquetOperator, n_states:int,
p_init: int, max_iters:int, tol_deflation:float=1e-12):
"""
Initializes the Block Arnoldi Engine.
:param monodromy_op: The block-enabled monodromy operator.
:param n_states: Number of differential states in the system.
:param p_init: Initial block size (number of simultaneous vectors).
:param max_iters: Maximum number of Block Arnoldi iterations.
:param tol_deflation: Tolerance for dropping linearly dependent vectors (SVD rank reveal).
"""
self.monodromy_op = monodromy_op
self.n = n_states
self.p_init = p_init
self.max_iters = max_iters
self.tol_deflation = tol_deflation
# V will store the orthogonal basis blocks. Max possible size: n x (max_iters * p_init)
self.V = np.zeros((self.n, self.max_iters * self.p_init), dtype=np.float64)
# H is the Block Upper Hessenberg matrix
self.H = np.zeros((self.max_iters * self.p_init, self.max_iters * self.p_init), dtype=np.float64)
# Track active block size (it can shrink due to deflation)
self.p_active = p_init
# Track starting column indices for each block in V and H
self.block_indices = [0]
def _orthogonalize_block(self, W:Mat, current_iter:int)-> Tuple[Mat, Mat]:
"""
Orthogonalizes W against all previously computed blocks V_0 ... V_j.
Uses a Numba BMGS kernel when available; otherwise falls back to NumPy BLAS.
:param W: The un-orthogonalized block matrix generated by the operator.
:param current_iter: The current Arnoldi iteration index (j).
:return: A tuple (W_ortho, H_col) containing the orthogonalized block and the Hessenberg column.
"""
j = current_iter
start_j = self.block_indices[j]
end_j = start_j + self.p_active
if end_j == 0:
return W, np.zeros((0, self.p_active), dtype=np.float64)
# Numba path (real-valued, cache-friendly)
if NUMBA_AVAILABLE and bmgs_twice_numba is not None:
starts = np.asarray(self.block_indices[:j + 1], dtype=np.int64)
ends = np.asarray(self.block_indices[1:j + 2], dtype=np.int64)
V_prev = np.ascontiguousarray(self.V[:, :end_j], dtype=np.float64)
Wc = np.ascontiguousarray(W, dtype=np.float64)
W_ortho, H_col = bmgs_twice_numba(V_prev, starts, ends, Wc)
return W_ortho, H_col
# NumPy BMGS + Kahan second pass
H_col = np.zeros((end_j, self.p_active), dtype=np.float64)
norm_W_initial = la.norm(W, ord='fro')
for i in range(j + 1):
start_i = self.block_indices[i]
end_i = self.block_indices[i + 1]
V_i = self.V[:, start_i:end_i]
H_ij = V_i.T @ W
H_col[start_i:end_i, :] += H_ij
W = W - V_i @ H_ij
norm_W_after = la.norm(W, ord='fro')
if norm_W_after < 0.717 * norm_W_initial:
for i in range(j + 1):
start_i = self.block_indices[i]
end_i = self.block_indices[i + 1]
V_i = self.V[:, start_i:end_i]
H_ij_corr = V_i.T @ W
H_col[start_i:end_i, :] += H_ij_corr
W = W - V_i @ H_ij_corr
return W, H_col
def _rank_revealing_svd(self, W: Mat) -> Tuple[Mat, Mat, int]:
"""
Extracts the orthonormal basis of W and the sub-diagonal Hessenberg block.
Deflates (removes) linearly dependent vectors using SVD.
:param W: The orthogonalized matrix block.
:return: A tuple (V_next, H_next, rank) with the new basis, sub-diagonal block, and active rank.
"""
# SVD: W = U * Sigma * V^T
U, S, Vh = la.svd(W, full_matrices=False)
# Find how many singular values are above the deflation tolerance
rank = np.sum(S > self.tol_deflation)
if rank < self.p_active:
print(f" [BIRAM] Deflation triggered! Block size shrunk from {self.p_active} to {rank}.")
# The new orthonormal basis block (V_{j+1})
V_next = U[:, :rank]
# The sub-diagonal block for the Hessenberg matrix (H_{j+1, j})
# H_{next} = Sigma * V^T
H_next = np.diag(S[:rank]) @ Vh[:rank, :]
return V_next, H_next, rank
[docs]
def build_krylov_space(self, V_init: Mat)-> Tuple[Mat, Mat]:
"""
Executes the robust Block Arnoldi process to build the Krylov subspace.
:param V_init: The initial orthonormal block seed (n_states x p_init).
:return: A tuple (V_final, H_final) representing the Krylov basis and Hessenberg matrix.
"""
# Set initial block
self.V[:, :self.p_active] = V_init
self.block_indices.append(self.p_active)
for j in range(self.max_iters - 1):
# 1. Extract current block V_j
start_j = self.block_indices[j]
end_j = self.block_indices[j + 1]
V_j = self.V[:, start_j:end_j]
# 2. Apply Operator (BLAS Level 3 Muscle)
W = self.monodromy_op.matmat(V_j)
# 3. Robust Orthogonalization
W, H_col = self._orthogonalize_block(W, current_iter=j)
# Insert the computed upper part into the global H matrix
self.H[0:end_j, start_j:end_j] = H_col
# 4. Rank-Revealing SVD & Deflation
V_next, H_next, new_rank = self._rank_revealing_svd(W)
if new_rank == 0:
print(" [BIRAM] Invariant Subspace found! Exact convergence reached early.")
break
# 5. Store Sub-diagonal block and new basis
self.H[end_j: end_j + new_rank, start_j:end_j] = H_next
self.V[:, end_j: end_j + new_rank] = V_next
# Update trackers
self.p_active = new_rank
self.block_indices.append(end_j + new_rank)
# Trim matrices to the actual active subspace size
final_dim = self.block_indices[-1]
V_final = self.V[:, :final_dim]
H_final = self.H[:final_dim, :final_dim]
return V_final, H_final
[docs]
class BestPack:
"""Data structure to hold the best Ritz pairs found during restarts."""
__slots__ = (
"mu_sel",
"U_sel",
"rel",
"max_rr",
)
def __init__(self):
self.mu_sel: CxVec | None = None
self.U_sel: CxVec | None = None
self.rel: Vec | None = None
self.max_rr: float = np.inf
[docs]
class SmallSignalStabilityEmtDriver(DriverTemplate):
"""
Base class for EMT Floquet stability analysis.
Orchestrates the entire limit cycle capture and Krylov subspace generation workflow.
"""
__slots__ = (
"problem",
"emt_options",
"options",
)
def __init__(self,
grid: MultiCircuit,
problem: EmtProblemTemplate,
emt_options: EmtOptions,
options: EmtSmallSignalStabilityOptions):
"""
Initializes the EMT Small Signal Stability Driver.
:param grid: The VeraGrid MultiCircuit network representation.
:param problem: The underlying EMT DAE mathematical problem.
:param emt_options: Integration settings for the limit cycle capture.
:param options: Specific algorithm settings for the Floquet analysis.
"""
DriverTemplate.__init__(self, grid=grid)
self.problem: EmtProblemTemplate = problem
self.emt_options: EmtOptions = emt_options
self.options: EmtSmallSignalStabilityOptions = options
self.results: SmallSignalStabilityEmtResults | None = None
def _capture_limit_cycle_and_evaluator(self,
h: float,
verbose: int = 0)-> Tuple[Mat, Vec, Any, Vec, int]:
"""
Captures the limit cycle by simulating the system up to ss_assessment_time.
Extracts the JIT Jacobian evaluator directly from the StructuralVectorizedSolver.
:param h: Integration time step.
:param verbose: Verbosity level flag.
:return: Tuple containing (y_trajectory, t_trajectory, jacobian_evaluator, static_parameters, n_event_params).
"""
if verbose > 0:
print("-> Capturing limit cycle via StructuralVectorizedSolver...")
# Direct contract access. If 'problem' is invalid, it will fail natively here.
params_values = self.problem.get_parameters_values()
n_event_params = self.problem.get_variable_parameter_number()
solver = StructuralVectorizedSolver(
problem=self.problem,
t0=0.0,
t_end=self.options.ss_assessment_time,
h=h,
integration_method=self.emt_options.integration_method,
verbose=self.options.verbose > 0
)
t_full, y_full = solver.simulate()
steps_per_period = int(self.options.target_period / h)
y_limit = y_full[-steps_per_period:]
t_limit = t_full[-steps_per_period:]
static_params = np.array([c.value for c in params_values], dtype=np.float64)
return y_limit, t_limit, solver.vec_jacobian, static_params, n_event_params
[docs]
def run_arnoldi(self):
"""
Executes the Standard Arnoldi Floquet analysis using SciPy ARPACK.
Suited for small-to-medium systems.
"""
self.tic()
h = self.emt_options.time_step
n_states = self.problem.get_states_number()
# 1. Capture limit cycle trajectory
y_traj, t_traj, jac_eval, stat_params, n_ev_params = self._capture_limit_cycle_and_evaluator(h)
# 2. Operator initialized with HPC LU Caching
monodromy_op = EmtFloquetOperator( # O BlockEmtFloquetOperator
problem=self.problem,
trajectory=y_traj,
h=h,
n_states=n_states,
method=self.emt_options.integration_method,
jac_evaluator=jac_eval,
static_params=stat_params,
n_event_params=n_ev_params,
t_trajectory=t_traj
)
# 3. Eigenvalue Extraction
# Arnoldi struggles when all eigenvalues have identical magnitudes.
if n_states <= max(self.options.k + 1, 20):
if self.options.verbose:
print(f"System size (N={n_states}). Using dense fallback for full spectrum.")
I_dense = np.eye(n_states)
C_M = np.zeros((n_states, n_states))
for i in range(n_states):
C_M[:, i] = monodromy_op.matvec(I_dense[:, i])
mu, v = la.eig(C_M)
else:
# High-Performance Sparse Arnoldi
search_k = min(self.options.k * 2, n_states - 2)
mu, v = spla.eigs(monodromy_op, k=search_k, which='LM', tol=1e-8)
# 4. SPECTRAL SHIFT FILTERING (Resonance Hunt)
if self.options.target_frequency_hz is not None:
f_target = self.options.target_frequency_hz
T = self.options.target_period
# Target multiplier on the unit circle
sigma = np.exp(1j * 2 * np.pi * f_target * T)
# Distance to both positive and negative frequencies to keep conjugate pairs together
distances = np.minimum(np.abs(mu - sigma), np.abs(mu - np.conj(sigma)))
# Sort by distance and truncate to the requested k modes
sort_indices = np.argsort(distances)[:self.options.k]
mu = mu[sort_indices]
v = v[:, sort_indices]
else:
sort_indices = np.argsort(-np.abs(mu))[:self.options.k]
mu = mu[sort_indices]
v = v[:, sort_indices]
# 5. Krylov Left-Eigenvector Projection (Lifting)
w = np.linalg.pinv(v).conj().T
# 6. Results Mapping
self.results = SmallSignalStabilityEmtResults(
multipliers=mu,
right_vecs=v,
left_vecs=w,
period=self.options.target_period,
stat_vars=self.problem.get_state_vars()
)
self.toc()
def _make_monodromy_operator(self, h: float, n_states: int):
"""
Creates the Monodromy Operator. Uses the HPC Ak-Stack if available,
otherwise falls back to the LU-cached Jacobian operator.
:param h: Integration time step.
:param n_states: Number of differential states.
:return: An initialized LinearOperator instance (Block or Stack-based).
"""
y_traj, t_traj, jac_eval, stat_params, n_ev_params = self._capture_limit_cycle_and_evaluator(h)
if self.options.prefer_ak_operator:
Ak_stack = self.problem.get_floquet_ak_stack(
trajectory=y_traj,
h=h,
jac_evaluator=jac_eval,
static_params=stat_params
)
if Ak_stack is not None:
if self.options.verbose:
print('Using explicit EMT A_k stack operator (Numba path if available).')
return AkStackBlockEmtFloquetOperator(Ak_stack, use_numba=self.options.use_numba_kernels)
return BlockEmtFloquetOperator(
problem=self.problem,
trajectory=y_traj,
h=h,
n_states=n_states,
method=self.emt_options.integration_method,
jac_evaluator=jac_eval,
static_params=stat_params,
n_event_params=n_ev_params,
t_trajectory=t_traj
)
@staticmethod
def _apply_operator_complex(monodromy_op: BlockEmtFloquetOperator, Xc: CxMat) -> CxMat:
"""
Helper method to apply a real-valued block operator to a complex block matrix.
Separates real and imaginary components.
:param monodromy_op: The block linear operator.
:param Xc: Complex block matrix.
:return: Propagated complex block matrix.
"""
Xr = np.ascontiguousarray(np.real(Xc), dtype=np.float64)
Xi = np.ascontiguousarray(np.imag(Xc), dtype=np.float64)
Yr = monodromy_op.matmat(Xr)
Yi = monodromy_op.matmat(Xi)
return Yr + 1j * Yi
def _compute_selected_pairs(self,
monodromy_op: BlockEmtFloquetOperator,
V_sub: Mat,
H_sub: Mat,
k_target: int,
p_seed: int,
use_refined: bool) -> Tuple[CxVec, CxMat, Vec]:
"""
Solves the projected sub-problem and extracts the dominant Ritz pairs.
:param monodromy_op: The block linear operator.
:param V_sub: The generated Krylov basis.
:param H_sub: The projected Hessenberg matrix.
:param k_target: The desired number of eigenvalues to keep.
:param p_seed: The block dimension size.
:param use_refined: Flag to use Refined Ritz vector extraction.
:return: Tuple containing (selected_multipliers, selected_vectors, relative_residuals).
"""
mu_sub, Y_sub = la.eig(H_sub)
sort_idx = np.argsort(-np.abs(mu_sub))
keep = min(len(sort_idx), max(k_target + 2, p_seed))
sel = sort_idx[:keep]
mu_sel = mu_sub[sel]
Y_sel = Y_sub[:, sel]
if use_refined:
# Refined Ritz vectors: z = arg min ||(AV - ΞΌV) z|| in the current subspace
AV = self._apply_operator_complex(monodromy_op, V_sub.astype(np.complex128))
Vc = V_sub.astype(np.complex128)
U_sel = np.zeros((V_sub.shape[0], keep), dtype=np.complex128)
for j, mu in enumerate(mu_sel):
Mj = AV - mu * Vc
G = Mj.conj().T @ Mj
# Small Hermitian eigenproblem (dim = krylov dim)
ew, Ez = la.eigh(G)
z = Ez[:, np.argmin(np.real(ew))]
u = Vc @ z
nu = la.norm(u)
if nu > 0:
u /= nu
else:
u = Vc @ Y_sel[:, j]
u /= max(float(la.norm(u)), 1e-15)
U_sel[:, j] = u
else:
U_sel = (V_sub @ Y_sel).astype(np.complex128)
AU_sel = self._apply_operator_complex(monodromy_op, U_sel)
R = AU_sel - U_sel * mu_sel.reshape(1, -1)
rnorm = np.linalg.norm(R, axis=0)
unorm = np.linalg.norm(U_sel, axis=0)
rel = rnorm / np.maximum(np.abs(mu_sel) * unorm, 1e-15)
return mu_sel, U_sel, rel
[docs]
def run_arnoldi_hybrid(self):
"""
Executes the Hybrid Block-Arnoldi (BIRAM) Floquet analysis.
Suited for extremely large sparse DAE systems utilizing BLAS-3 acceleration
and robust block projection to handle invariant subspace degeneracy.
"""
self.tic()
h = self.emt_options.time_step
n_states = self.problem.get_states_number()
# Operator (A_k stack if the EMT backend exposes it, else LU-cached DAE operator)
monodromy_op = self._make_monodromy_operator(h, n_states)
# Hyperparameters (Clean Code: Native types passed directly from Options)
k_target = self.options.k
base_max_dim = self.options.max_krylov_dim
max_restarts = self.options.max_restarts
restart_tol = self.options.restart_tol
use_refined = self.options.use_refined_ritz
adaptive_restart = self.options.adaptive_restart
stagnation_ratio = self.options.stagnation_improve_ratio
stagnation_patience = self.options.stagnation_patience
deflation_tol = self.options.deflation_tol
# Handling optional boundaries cleanly
p_min = self.options.min_block_size if self.options.min_block_size is not None else max(2, k_target)
p_max = self.options.max_block_size if self.options.max_block_size is not None else min(n_states, k_target + 8)
max_dim_cap = self.options.max_krylov_dim_cap if self.options.max_krylov_dim_cap is not None else max(
base_max_dim, int(1.8 * base_max_dim))
p_seed = min(max(k_target + 2, p_min), p_max, n_states)
max_dim_local = max(base_max_dim, p_seed + 1)
m_iters = max(2, max_dim_local // p_seed)
if self.options.verbose:
# self.report_text()
# self.report_progress()
# self.report_progress2()
print(
f'Executing Hybrid Block-Arnoldi (Block Size: {p_seed}, Iters: {m_iters}, Restarts: {max_restarts})...')
if NUMBA_AVAILABLE and bmgs_twice_numba is not None:
print(' [BIRAM] Numba BMGS kernel: ON')
rng = np.random.default_rng(42)
V_seed, _ = np.linalg.qr(rng.standard_normal((n_states, p_seed)))
best_pack = BestPack()
residual_history = []
for ir in range(max_restarts + 1):
if self.options.verbose:
print(f' [BIRAM] Restart cycle {ir + 1}/{max_restarts + 1}')
engine = RobustBlockArnoldiEngine(
monodromy_op=monodromy_op,
n_states=n_states,
p_init=p_seed,
max_iters=m_iters,
tol_deflation=deflation_tol,
)
V_sub, H_sub = engine.build_krylov_space(V_seed)
mu_sel, U_sel, rel = self._compute_selected_pairs(
monodromy_op=monodromy_op,
V_sub=V_sub,
H_sub=H_sub,
k_target=k_target,
p_seed=p_seed,
use_refined=use_refined,
)
# Track best result seen so far
sort_now = np.argsort(-np.abs(mu_sel))
topk = sort_now[:min(k_target, len(sort_now))]
max_rr = float(np.max(rel[topk])) if len(topk) > 0 else np.inf
residual_history.append(max_rr)
if self.options.verbose:
print(f' [BIRAM] top-{len(topk)} max rel-res = {max_rr:.3e}')
if max_rr < best_pack.max_rr:
best_pack.mu_sel = mu_sel.copy()
best_pack.U_sel = U_sel.copy()
best_pack.rel = rel.copy()
best_pack.max_rr = max_rr
if max_rr <= restart_tol:
if self.options.verbose:
print(' [BIRAM] Converged by residual tolerance.')
break
if ir >= max_restarts:
break
# Prepare restart seed from refined/Ritz vectors
V_seed = build_restart_seed(U_sel, p_target=p_seed, seed=42 + ir)
# Elastic adaptation under residual stagnation
if adaptive_restart and len(residual_history) >= stagnation_patience + 1:
recent = residual_history[-1]
prev_best = min(residual_history[-(stagnation_patience + 1):-1])
if recent > stagnation_ratio * prev_best:
changed = False
if p_seed > p_min:
p_new = max(p_min, p_seed - 2)
if p_new != p_seed:
p_seed = p_new
V_seed = build_restart_seed(U_sel, p_target=p_seed, seed=99 + ir)
changed = True
if self.options.verbose:
print(f' [BIRAM] Residual stagnation detected -> shrink block to p={p_seed}.')
else:
# Increase subspace depth budget if block is already minimal
max_dim_new = min(max_dim_cap, max_dim_local + p_seed)
if max_dim_new > max_dim_local:
max_dim_local = max_dim_new
changed = True
if self.options.verbose:
print(
f' [BIRAM] Residual stagnation detected -> expand max Krylov dim to {max_dim_local}.')
if changed:
m_iters = max(2, max_dim_local // p_seed)
# Final selection (best observed cycle)
if best_pack is None:
raise RuntimeError('HybridBlockArnoldiDriver failed to produce Ritz pairs.')
mu_sel = best_pack.mu_sel
U_sel = best_pack.U_sel
sort_idx = np.argsort(-np.abs(mu_sel))[:k_target]
mu: CxVec = mu_sel[sort_idx]
v: CxVec = U_sel[:, sort_idx]
# Pseudo-inverse dual space lifting (Cleaned)
w = np.linalg.pinv(v).conj().T
self.results = SmallSignalStabilityEmtResults(
multipliers=mu,
right_vecs=v,
left_vecs=w,
period=self.options.target_period,
stat_vars=self.problem.get_state_vars()
)
self.toc()
[docs]
def run(self):
"""
Executes the analysis based on the selected builder type in the options.
"""
if self.options.build_type == SmallSignalEmtBuildTypes.Arnoldi:
self.run_arnoldi()
elif self.options.build_type == SmallSignalEmtBuildTypes.HybridArnoldi:
self.run_arnoldi_hybrid()
else:
raise RuntimeError(f'Unknown build type: {self.options.build_type}')