Source code for VeraGridEngine.Simulations.SmallSignalStabilityEmt.small_signal_stability_emt_driver

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from typing import Tuple, Any
import numpy as np
import scipy.sparse.linalg as spla
import scipy.linalg as la

from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Simulations.EMT.problems.emt_problem_template import EmtProblemTemplate
from VeraGridEngine.Simulations.driver_template import DriverTemplate
from VeraGridEngine.Simulations.EMT.emt_options import EmtOptions
from VeraGridEngine.Simulations.EMT.solvers.StructuralVectorizedSolver import StructuralVectorizedSolver

from VeraGridEngine.Simulations.SmallSignalStabilityEmt.emt_floquet_operator import (EmtFloquetOperator,
                                                                                     BlockEmtFloquetOperator,
                                                                                     AkStackBlockEmtFloquetOperator)
from VeraGridEngine.Simulations.SmallSignalStabilityEmt.small_signal_stability_emt_options import \
    EmtSmallSignalStabilityOptions
from VeraGridEngine.Simulations.SmallSignalStabilityEmt.small_signal_stability_emt_results import \
    SmallSignalStabilityEmtResults
from VeraGridEngine.Simulations.SmallSignalStabilityEmt.emt_floquet_numba_kernels import NUMBA_AVAILABLE, \
    bmgs_twice_numba
from VeraGridEngine.basic_structures import Vec, CxVec, Mat, CxMat
from VeraGridEngine.enumerations import SmallSignalEmtBuildTypes


[docs] def build_restart_seed(U_sel: CxMat, p_target: int, seed: int = 42) -> Mat: """ Builds a real orthonormal restart block from complex Ritz/refined vectors. Extracts real and imaginary parts to form a strictly real subspace seed. :param U_sel: Complex matrix of selected Ritz or Refined vectors. :param p_target: Desired number of columns (block size) for the restart seed. :param seed: Random seed for generating orthogonal padding if needed. :return: A real orthonormal block matrix of shape (n_states, p_target). """ n = U_sel.shape[0] cols = [] for j in range(U_sel.shape[1]): ur = np.real(U_sel[:, j]) if np.linalg.norm(ur) > 1e-14: cols.append(ur) if len(cols) >= p_target: break ui = np.imag(U_sel[:, j]) if np.linalg.norm(ui) > 1e-14: cols.append(ui) if len(cols) >= p_target: break rng = np.random.default_rng(seed) while len(cols) < p_target: cols.append(rng.standard_normal(n)) S = np.column_stack(cols[:p_target]) Q, _ = np.linalg.qr(S) return Q[:, :p_target]
[docs] class RobustBlockArnoldiEngine: """ Implements: 1. Block Modified Gram-Schmidt (BMGS). 2. DGKS/Kahan Re-orthogonalization ("Twice is Enough"). 3. Rank-Revealing SVD for Block Deflation. """ __slots__ = ( "monodromy_op", "n", "p_init", "max_iters", "tol_deflation", "V", "H", "p_active", "block_indices", ) def __init__(self, monodromy_op: BlockEmtFloquetOperator, n_states:int, p_init: int, max_iters:int, tol_deflation:float=1e-12): """ Initializes the Block Arnoldi Engine. :param monodromy_op: The block-enabled monodromy operator. :param n_states: Number of differential states in the system. :param p_init: Initial block size (number of simultaneous vectors). :param max_iters: Maximum number of Block Arnoldi iterations. :param tol_deflation: Tolerance for dropping linearly dependent vectors (SVD rank reveal). """ self.monodromy_op = monodromy_op self.n = n_states self.p_init = p_init self.max_iters = max_iters self.tol_deflation = tol_deflation # V will store the orthogonal basis blocks. Max possible size: n x (max_iters * p_init) self.V = np.zeros((self.n, self.max_iters * self.p_init), dtype=np.float64) # H is the Block Upper Hessenberg matrix self.H = np.zeros((self.max_iters * self.p_init, self.max_iters * self.p_init), dtype=np.float64) # Track active block size (it can shrink due to deflation) self.p_active = p_init # Track starting column indices for each block in V and H self.block_indices = [0] def _orthogonalize_block(self, W:Mat, current_iter:int)-> Tuple[Mat, Mat]: """ Orthogonalizes W against all previously computed blocks V_0 ... V_j. Uses a Numba BMGS kernel when available; otherwise falls back to NumPy BLAS. :param W: The un-orthogonalized block matrix generated by the operator. :param current_iter: The current Arnoldi iteration index (j). :return: A tuple (W_ortho, H_col) containing the orthogonalized block and the Hessenberg column. """ j = current_iter start_j = self.block_indices[j] end_j = start_j + self.p_active if end_j == 0: return W, np.zeros((0, self.p_active), dtype=np.float64) # Numba path (real-valued, cache-friendly) if NUMBA_AVAILABLE and bmgs_twice_numba is not None: starts = np.asarray(self.block_indices[:j + 1], dtype=np.int64) ends = np.asarray(self.block_indices[1:j + 2], dtype=np.int64) V_prev = np.ascontiguousarray(self.V[:, :end_j], dtype=np.float64) Wc = np.ascontiguousarray(W, dtype=np.float64) W_ortho, H_col = bmgs_twice_numba(V_prev, starts, ends, Wc) return W_ortho, H_col # NumPy BMGS + Kahan second pass H_col = np.zeros((end_j, self.p_active), dtype=np.float64) norm_W_initial = la.norm(W, ord='fro') for i in range(j + 1): start_i = self.block_indices[i] end_i = self.block_indices[i + 1] V_i = self.V[:, start_i:end_i] H_ij = V_i.T @ W H_col[start_i:end_i, :] += H_ij W = W - V_i @ H_ij norm_W_after = la.norm(W, ord='fro') if norm_W_after < 0.717 * norm_W_initial: for i in range(j + 1): start_i = self.block_indices[i] end_i = self.block_indices[i + 1] V_i = self.V[:, start_i:end_i] H_ij_corr = V_i.T @ W H_col[start_i:end_i, :] += H_ij_corr W = W - V_i @ H_ij_corr return W, H_col def _rank_revealing_svd(self, W: Mat) -> Tuple[Mat, Mat, int]: """ Extracts the orthonormal basis of W and the sub-diagonal Hessenberg block. Deflates (removes) linearly dependent vectors using SVD. :param W: The orthogonalized matrix block. :return: A tuple (V_next, H_next, rank) with the new basis, sub-diagonal block, and active rank. """ # SVD: W = U * Sigma * V^T U, S, Vh = la.svd(W, full_matrices=False) # Find how many singular values are above the deflation tolerance rank = np.sum(S > self.tol_deflation) if rank < self.p_active: print(f" [BIRAM] Deflation triggered! Block size shrunk from {self.p_active} to {rank}.") # The new orthonormal basis block (V_{j+1}) V_next = U[:, :rank] # The sub-diagonal block for the Hessenberg matrix (H_{j+1, j}) # H_{next} = Sigma * V^T H_next = np.diag(S[:rank]) @ Vh[:rank, :] return V_next, H_next, rank
[docs] def build_krylov_space(self, V_init: Mat)-> Tuple[Mat, Mat]: """ Executes the robust Block Arnoldi process to build the Krylov subspace. :param V_init: The initial orthonormal block seed (n_states x p_init). :return: A tuple (V_final, H_final) representing the Krylov basis and Hessenberg matrix. """ # Set initial block self.V[:, :self.p_active] = V_init self.block_indices.append(self.p_active) for j in range(self.max_iters - 1): # 1. Extract current block V_j start_j = self.block_indices[j] end_j = self.block_indices[j + 1] V_j = self.V[:, start_j:end_j] # 2. Apply Operator (BLAS Level 3 Muscle) W = self.monodromy_op.matmat(V_j) # 3. Robust Orthogonalization W, H_col = self._orthogonalize_block(W, current_iter=j) # Insert the computed upper part into the global H matrix self.H[0:end_j, start_j:end_j] = H_col # 4. Rank-Revealing SVD & Deflation V_next, H_next, new_rank = self._rank_revealing_svd(W) if new_rank == 0: print(" [BIRAM] Invariant Subspace found! Exact convergence reached early.") break # 5. Store Sub-diagonal block and new basis self.H[end_j: end_j + new_rank, start_j:end_j] = H_next self.V[:, end_j: end_j + new_rank] = V_next # Update trackers self.p_active = new_rank self.block_indices.append(end_j + new_rank) # Trim matrices to the actual active subspace size final_dim = self.block_indices[-1] V_final = self.V[:, :final_dim] H_final = self.H[:final_dim, :final_dim] return V_final, H_final
[docs] class BestPack: """Data structure to hold the best Ritz pairs found during restarts.""" __slots__ = ( "mu_sel", "U_sel", "rel", "max_rr", ) def __init__(self): self.mu_sel: CxVec | None = None self.U_sel: CxVec | None = None self.rel: Vec | None = None self.max_rr: float = np.inf
[docs] class SmallSignalStabilityEmtDriver(DriverTemplate): """ Base class for EMT Floquet stability analysis. Orchestrates the entire limit cycle capture and Krylov subspace generation workflow. """ __slots__ = ( "problem", "emt_options", "options", ) def __init__(self, grid: MultiCircuit, problem: EmtProblemTemplate, emt_options: EmtOptions, options: EmtSmallSignalStabilityOptions): """ Initializes the EMT Small Signal Stability Driver. :param grid: The VeraGrid MultiCircuit network representation. :param problem: The underlying EMT DAE mathematical problem. :param emt_options: Integration settings for the limit cycle capture. :param options: Specific algorithm settings for the Floquet analysis. """ DriverTemplate.__init__(self, grid=grid) self.problem: EmtProblemTemplate = problem self.emt_options: EmtOptions = emt_options self.options: EmtSmallSignalStabilityOptions = options self.results: SmallSignalStabilityEmtResults | None = None def _capture_limit_cycle_and_evaluator(self, h: float, verbose: int = 0)-> Tuple[Mat, Vec, Any, Vec, int]: """ Captures the limit cycle by simulating the system up to ss_assessment_time. Extracts the JIT Jacobian evaluator directly from the StructuralVectorizedSolver. :param h: Integration time step. :param verbose: Verbosity level flag. :return: Tuple containing (y_trajectory, t_trajectory, jacobian_evaluator, static_parameters, n_event_params). """ if verbose > 0: print("-> Capturing limit cycle via StructuralVectorizedSolver...") # Direct contract access. If 'problem' is invalid, it will fail natively here. params_values = self.problem.get_parameters_values() n_event_params = self.problem.get_variable_parameter_number() solver = StructuralVectorizedSolver( problem=self.problem, t0=0.0, t_end=self.options.ss_assessment_time, h=h, integration_method=self.emt_options.integration_method, verbose=self.options.verbose > 0 ) t_full, y_full = solver.simulate() steps_per_period = int(self.options.target_period / h) y_limit = y_full[-steps_per_period:] t_limit = t_full[-steps_per_period:] static_params = np.array([c.value for c in params_values], dtype=np.float64) return y_limit, t_limit, solver.vec_jacobian, static_params, n_event_params
[docs] def run_arnoldi(self): """ Executes the Standard Arnoldi Floquet analysis using SciPy ARPACK. Suited for small-to-medium systems. """ self.tic() h = self.emt_options.time_step n_states = self.problem.get_states_number() # 1. Capture limit cycle trajectory y_traj, t_traj, jac_eval, stat_params, n_ev_params = self._capture_limit_cycle_and_evaluator(h) # 2. Operator initialized with HPC LU Caching monodromy_op = EmtFloquetOperator( # O BlockEmtFloquetOperator problem=self.problem, trajectory=y_traj, h=h, n_states=n_states, method=self.emt_options.integration_method, jac_evaluator=jac_eval, static_params=stat_params, n_event_params=n_ev_params, t_trajectory=t_traj ) # 3. Eigenvalue Extraction # Arnoldi struggles when all eigenvalues have identical magnitudes. if n_states <= max(self.options.k + 1, 20): if self.options.verbose: print(f"System size (N={n_states}). Using dense fallback for full spectrum.") I_dense = np.eye(n_states) C_M = np.zeros((n_states, n_states)) for i in range(n_states): C_M[:, i] = monodromy_op.matvec(I_dense[:, i]) mu, v = la.eig(C_M) else: # High-Performance Sparse Arnoldi search_k = min(self.options.k * 2, n_states - 2) mu, v = spla.eigs(monodromy_op, k=search_k, which='LM', tol=1e-8) # 4. SPECTRAL SHIFT FILTERING (Resonance Hunt) if self.options.target_frequency_hz is not None: f_target = self.options.target_frequency_hz T = self.options.target_period # Target multiplier on the unit circle sigma = np.exp(1j * 2 * np.pi * f_target * T) # Distance to both positive and negative frequencies to keep conjugate pairs together distances = np.minimum(np.abs(mu - sigma), np.abs(mu - np.conj(sigma))) # Sort by distance and truncate to the requested k modes sort_indices = np.argsort(distances)[:self.options.k] mu = mu[sort_indices] v = v[:, sort_indices] else: sort_indices = np.argsort(-np.abs(mu))[:self.options.k] mu = mu[sort_indices] v = v[:, sort_indices] # 5. Krylov Left-Eigenvector Projection (Lifting) w = np.linalg.pinv(v).conj().T # 6. Results Mapping self.results = SmallSignalStabilityEmtResults( multipliers=mu, right_vecs=v, left_vecs=w, period=self.options.target_period, stat_vars=self.problem.get_state_vars() ) self.toc()
def _make_monodromy_operator(self, h: float, n_states: int): """ Creates the Monodromy Operator. Uses the HPC Ak-Stack if available, otherwise falls back to the LU-cached Jacobian operator. :param h: Integration time step. :param n_states: Number of differential states. :return: An initialized LinearOperator instance (Block or Stack-based). """ y_traj, t_traj, jac_eval, stat_params, n_ev_params = self._capture_limit_cycle_and_evaluator(h) if self.options.prefer_ak_operator: Ak_stack = self.problem.get_floquet_ak_stack( trajectory=y_traj, h=h, jac_evaluator=jac_eval, static_params=stat_params ) if Ak_stack is not None: if self.options.verbose: print('Using explicit EMT A_k stack operator (Numba path if available).') return AkStackBlockEmtFloquetOperator(Ak_stack, use_numba=self.options.use_numba_kernels) return BlockEmtFloquetOperator( problem=self.problem, trajectory=y_traj, h=h, n_states=n_states, method=self.emt_options.integration_method, jac_evaluator=jac_eval, static_params=stat_params, n_event_params=n_ev_params, t_trajectory=t_traj ) @staticmethod def _apply_operator_complex(monodromy_op: BlockEmtFloquetOperator, Xc: CxMat) -> CxMat: """ Helper method to apply a real-valued block operator to a complex block matrix. Separates real and imaginary components. :param monodromy_op: The block linear operator. :param Xc: Complex block matrix. :return: Propagated complex block matrix. """ Xr = np.ascontiguousarray(np.real(Xc), dtype=np.float64) Xi = np.ascontiguousarray(np.imag(Xc), dtype=np.float64) Yr = monodromy_op.matmat(Xr) Yi = monodromy_op.matmat(Xi) return Yr + 1j * Yi def _compute_selected_pairs(self, monodromy_op: BlockEmtFloquetOperator, V_sub: Mat, H_sub: Mat, k_target: int, p_seed: int, use_refined: bool) -> Tuple[CxVec, CxMat, Vec]: """ Solves the projected sub-problem and extracts the dominant Ritz pairs. :param monodromy_op: The block linear operator. :param V_sub: The generated Krylov basis. :param H_sub: The projected Hessenberg matrix. :param k_target: The desired number of eigenvalues to keep. :param p_seed: The block dimension size. :param use_refined: Flag to use Refined Ritz vector extraction. :return: Tuple containing (selected_multipliers, selected_vectors, relative_residuals). """ mu_sub, Y_sub = la.eig(H_sub) sort_idx = np.argsort(-np.abs(mu_sub)) keep = min(len(sort_idx), max(k_target + 2, p_seed)) sel = sort_idx[:keep] mu_sel = mu_sub[sel] Y_sel = Y_sub[:, sel] if use_refined: # Refined Ritz vectors: z = arg min ||(AV - ΞΌV) z|| in the current subspace AV = self._apply_operator_complex(monodromy_op, V_sub.astype(np.complex128)) Vc = V_sub.astype(np.complex128) U_sel = np.zeros((V_sub.shape[0], keep), dtype=np.complex128) for j, mu in enumerate(mu_sel): Mj = AV - mu * Vc G = Mj.conj().T @ Mj # Small Hermitian eigenproblem (dim = krylov dim) ew, Ez = la.eigh(G) z = Ez[:, np.argmin(np.real(ew))] u = Vc @ z nu = la.norm(u) if nu > 0: u /= nu else: u = Vc @ Y_sel[:, j] u /= max(float(la.norm(u)), 1e-15) U_sel[:, j] = u else: U_sel = (V_sub @ Y_sel).astype(np.complex128) AU_sel = self._apply_operator_complex(monodromy_op, U_sel) R = AU_sel - U_sel * mu_sel.reshape(1, -1) rnorm = np.linalg.norm(R, axis=0) unorm = np.linalg.norm(U_sel, axis=0) rel = rnorm / np.maximum(np.abs(mu_sel) * unorm, 1e-15) return mu_sel, U_sel, rel
[docs] def run_arnoldi_hybrid(self): """ Executes the Hybrid Block-Arnoldi (BIRAM) Floquet analysis. Suited for extremely large sparse DAE systems utilizing BLAS-3 acceleration and robust block projection to handle invariant subspace degeneracy. """ self.tic() h = self.emt_options.time_step n_states = self.problem.get_states_number() # Operator (A_k stack if the EMT backend exposes it, else LU-cached DAE operator) monodromy_op = self._make_monodromy_operator(h, n_states) # Hyperparameters (Clean Code: Native types passed directly from Options) k_target = self.options.k base_max_dim = self.options.max_krylov_dim max_restarts = self.options.max_restarts restart_tol = self.options.restart_tol use_refined = self.options.use_refined_ritz adaptive_restart = self.options.adaptive_restart stagnation_ratio = self.options.stagnation_improve_ratio stagnation_patience = self.options.stagnation_patience deflation_tol = self.options.deflation_tol # Handling optional boundaries cleanly p_min = self.options.min_block_size if self.options.min_block_size is not None else max(2, k_target) p_max = self.options.max_block_size if self.options.max_block_size is not None else min(n_states, k_target + 8) max_dim_cap = self.options.max_krylov_dim_cap if self.options.max_krylov_dim_cap is not None else max( base_max_dim, int(1.8 * base_max_dim)) p_seed = min(max(k_target + 2, p_min), p_max, n_states) max_dim_local = max(base_max_dim, p_seed + 1) m_iters = max(2, max_dim_local // p_seed) if self.options.verbose: # self.report_text() # self.report_progress() # self.report_progress2() print( f'Executing Hybrid Block-Arnoldi (Block Size: {p_seed}, Iters: {m_iters}, Restarts: {max_restarts})...') if NUMBA_AVAILABLE and bmgs_twice_numba is not None: print(' [BIRAM] Numba BMGS kernel: ON') rng = np.random.default_rng(42) V_seed, _ = np.linalg.qr(rng.standard_normal((n_states, p_seed))) best_pack = BestPack() residual_history = [] for ir in range(max_restarts + 1): if self.options.verbose: print(f' [BIRAM] Restart cycle {ir + 1}/{max_restarts + 1}') engine = RobustBlockArnoldiEngine( monodromy_op=monodromy_op, n_states=n_states, p_init=p_seed, max_iters=m_iters, tol_deflation=deflation_tol, ) V_sub, H_sub = engine.build_krylov_space(V_seed) mu_sel, U_sel, rel = self._compute_selected_pairs( monodromy_op=monodromy_op, V_sub=V_sub, H_sub=H_sub, k_target=k_target, p_seed=p_seed, use_refined=use_refined, ) # Track best result seen so far sort_now = np.argsort(-np.abs(mu_sel)) topk = sort_now[:min(k_target, len(sort_now))] max_rr = float(np.max(rel[topk])) if len(topk) > 0 else np.inf residual_history.append(max_rr) if self.options.verbose: print(f' [BIRAM] top-{len(topk)} max rel-res = {max_rr:.3e}') if max_rr < best_pack.max_rr: best_pack.mu_sel = mu_sel.copy() best_pack.U_sel = U_sel.copy() best_pack.rel = rel.copy() best_pack.max_rr = max_rr if max_rr <= restart_tol: if self.options.verbose: print(' [BIRAM] Converged by residual tolerance.') break if ir >= max_restarts: break # Prepare restart seed from refined/Ritz vectors V_seed = build_restart_seed(U_sel, p_target=p_seed, seed=42 + ir) # Elastic adaptation under residual stagnation if adaptive_restart and len(residual_history) >= stagnation_patience + 1: recent = residual_history[-1] prev_best = min(residual_history[-(stagnation_patience + 1):-1]) if recent > stagnation_ratio * prev_best: changed = False if p_seed > p_min: p_new = max(p_min, p_seed - 2) if p_new != p_seed: p_seed = p_new V_seed = build_restart_seed(U_sel, p_target=p_seed, seed=99 + ir) changed = True if self.options.verbose: print(f' [BIRAM] Residual stagnation detected -> shrink block to p={p_seed}.') else: # Increase subspace depth budget if block is already minimal max_dim_new = min(max_dim_cap, max_dim_local + p_seed) if max_dim_new > max_dim_local: max_dim_local = max_dim_new changed = True if self.options.verbose: print( f' [BIRAM] Residual stagnation detected -> expand max Krylov dim to {max_dim_local}.') if changed: m_iters = max(2, max_dim_local // p_seed) # Final selection (best observed cycle) if best_pack is None: raise RuntimeError('HybridBlockArnoldiDriver failed to produce Ritz pairs.') mu_sel = best_pack.mu_sel U_sel = best_pack.U_sel sort_idx = np.argsort(-np.abs(mu_sel))[:k_target] mu: CxVec = mu_sel[sort_idx] v: CxVec = U_sel[:, sort_idx] # Pseudo-inverse dual space lifting (Cleaned) w = np.linalg.pinv(v).conj().T self.results = SmallSignalStabilityEmtResults( multipliers=mu, right_vecs=v, left_vecs=w, period=self.options.target_period, stat_vars=self.problem.get_state_vars() ) self.toc()
[docs] def run(self): """ Executes the analysis based on the selected builder type in the options. """ if self.options.build_type == SmallSignalEmtBuildTypes.Arnoldi: self.run_arnoldi() elif self.options.build_type == SmallSignalEmtBuildTypes.HybridArnoldi: self.run_arnoldi_hybrid() else: raise RuntimeError(f'Unknown build type: {self.options.build_type}')