Source code for VeraGridEngine.Simulations.SmallSignalStabilityEmt.emt_floquet_operator

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

"""
EMT Floquet Linear Operators Module.

This module provides high-performance LinearOperator subclasses compatible with
SciPy's sparse linear algebra solvers (like ARPACK). These operators define the
action of the Monodromy Matrix (state-transition over one full limit cycle)
without explicitly constructing it.

Included Operators:
- EmtFloquetOperator: Standard (vector-by-vector) LU-cached operator.
- BlockEmtFloquetOperator: Block (matrix-by-matrix) LU-cached operator.
- AkStackBlockEmtFloquetOperator: Explicit pre-computed transition matrix stack operator.
"""

import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as spla
from typing import Any, Optional
from VeraGridEngine.Simulations.SmallSignalStabilityEmt.emt_floquet_numba_kernels import (NUMBA_AVAILABLE,
                                                                                          apply_ak_stack_block_numba)
from VeraGridEngine.basic_structures import Mat, Vec
from VeraGridEngine.enumerations import DynamicIntegrationMethod


[docs] class EmtFloquetOperator(spla.LinearOperator): """ HPC Matrix-Free Monodromy Operator for abc-frame EMT systems. Optimized with LU Factorization Caching to eliminate redundant computations during Krylov subspace iterations (Arnoldi). This operator processes single perturbation vectors. """ def __init__(self, problem: Any, trajectory: Mat, h: float, n_states: int, method: DynamicIntegrationMethod = DynamicIntegrationMethod.DaeBackEuler, jac_evaluator: Optional[Any] = None, static_params:Optional[Vec] = None, n_event_params: int = 0, t_trajectory: Optional[Vec] = None): """ Initializes the standard EmtFloquetOperator. Args: problem: The underlying EMT DAE problem instance. trajectory: Pre-captured steady-state trajectory over one period. h: Integration time step. n_states: Number of differential state variables. method_str: Numerical integration method (e.g., 'back_euler'). jac_evaluator: JIT-compiled Jacobian evaluator function. static_params: Array of static system parameters. n_event_params: Number of event-driven parameters. t_trajectory: Time array corresponding to the limit cycle trajectory. """ op_shape = (n_states, n_states) op_dtype = np.float64 super().__init__(dtype=op_dtype, shape=op_shape) self.problem = problem self.trajectory = trajectory self.h = h self.n_states = n_states self.method = method self.n_total = self.problem.get_states_number() + self.problem.get_algebraic_var_number() self.lu_solvers = [] # Pre-computing Router if jac_evaluator is not None and t_trajectory is not None: self._precompute_from_jit(jac_evaluator, static_params, n_event_params,t_trajectory) else: self._precompute_lu_factorizations() def _precompute_from_jit(self, jac_evaluator: Any, static_params: Vec, n_ev_params: int, t_trajectory:Vec)->None: """ Evaluates and caches sparse LU factorizations using the JIT compiled evaluator :param jac_evaluator: :param static_params: :param n_ev_params: :param t_trajectory: :return: """ full_params = np.empty(n_ev_params + len(static_params), dtype=np.float64) if len(static_params) > 0: full_params[n_ev_params:] = static_params ev_params = np.zeros(n_ev_params, dtype=np.float64) dx_dummy = np.zeros(self.n_total, dtype=np.float64) for i in range(1, len(self.trajectory)): step_k = self.trajectory[i] step_prev = self.trajectory[i - 1] step_prev2 = self.trajectory[i - 2] if i > 1 else step_prev x_k = step_k[0] if isinstance(step_k, tuple) else step_k x_prev = step_prev[0] if isinstance(step_prev, tuple) else step_prev x_prev2 = step_prev2[0] if isinstance(step_prev2, tuple) else step_prev2 t_curr = t_trajectory[i] if t_trajectory is not None else (i * self.h) if n_ev_params > 0: ev_params = self.problem.def_event_params_fn(ev_params, float(t_curr)) full_params[:n_ev_params] = ev_params J_sparse = jac_evaluator(states=x_k, params=full_params, history=x_prev, d_history=dx_dummy, h=self.h, history2=x_prev2) self.lu_solvers.append(spla.splu(J_sparse)) def _precompute_lu_factorizations(self)-> None: """ Pre-calculates SuperLU factorizations for the entire periodic trajectory. This reduces the cost of Arnoldi iterations from O(iters * steps * n^3) to O(steps * n^3 + iters * steps * n^2), an extreme HPC acceleration. """ for x_k, y_k in self.trajectory: fx = self.problem.get_j11(x_k, y_k, self.h) fy = self.problem.get_j12(x_k, y_k, self.h) gx = self.problem.get_j21(x_k, y_k, self.h) gy = self.problem.get_j22(x_k, y_k, self.h) J_aug = sp.vstack([ sp.hstack([fx, fy]), sp.hstack([gx, gy]) ], format='csc') # splu requires CSC format and computes the exact LU factorization once self.lu_solvers.append(spla.splu(J_aug)) def _matvec(self, v0: Vec)-> Vec: """ Applies the Monodromy operator to a single vector v0 :param v0: :return: """ v_curr = np.zeros(self.n_total, dtype=np.float64) v_curr[:self.n_states] = v0 v_prev = v_curr.copy() v_prev2 = v_curr.copy() dv_prev = np.zeros_like(v_curr) rhs = np.zeros_like(v_curr) for lu in self.lu_solvers: if self.method == DynamicIntegrationMethod.DaeBackEuler: rhs[:self.n_states] = v_prev[:self.n_states] / self.h elif self.method == DynamicIntegrationMethod.DaeBDF2: rhs[:self.n_states] = (2.0 * v_prev[:self.n_states] - 0.5 * v_prev2[:self.n_states]) / self.h elif self.method == DynamicIntegrationMethod.DaeTrapezoidal: rhs[:self.n_states] = (2.0 / self.h) * v_prev[:self.n_states] + dv_prev[:self.n_states] else: rhs[:self.n_states] = v_prev[:self.n_states] / self.h v_curr = lu.solve(rhs) if self.method == DynamicIntegrationMethod.DaeTrapezoidal: dv_prev[:self.n_states] = (2.0 / self.h) * (v_curr[:self.n_states] - v_prev[:self.n_states]) - dv_prev[ :self.n_states] v_prev2 = v_prev.copy() v_prev = v_curr.copy() return v_curr[:self.n_states] def _rmatvec(self, w0: Vec) -> Vec: """ Adjoint (Backward) integration to extract Left Eigenvectors. Utilizes the trans='T' flag in SuperLU for zero-cost matrix transposition. :param w0: :return: """ w_curr = w0.copy() rhs = np.zeros(self.n_total) # Reverse-time integration using the transposed solvers for lu in reversed(self.lu_solvers): rhs[:self.n_states] = w_curr # Fast back-substitution solving J_aug^T * sol = rhs sol = lu.solve(rhs, trans='T') w_curr = sol[:self.n_states] return w_curr
#New Implementation System for Blocks calc.
[docs] class BlockEmtFloquetOperator(spla.LinearOperator): """ Advanced HPC Block Monodromy Operator for EMT Small-Signal Stability. This class is specifically designed to support Randomized Numerical Linear Algebra (RandNLA) and Block Krylov subspace methods. Unlike standard operators that process a single column vector at a time (BLAS Level 2), this operator accepts a dense block matrix (N x p) of vectors simultaneously. By processing multiple perturbation vectors at once, it forces the underlying SciPy/SuperLU backend to utilize BLAS Level 3 routines (Matrix-Matrix multiplication and Block Forward/Backward Substitution). This maximizes CPU L1/L2 cache utilization, drastically reducing the memory-bound bottlenecks inherent to large-scale sparse electrical networks. Attributes: problem: The underlying EMT DAE problem instance. trajectory: List of pre-captured steady-state tuples (x, y) over one period T. h (float): Integration time step. n_states (int): Number of differential state variables. lu_solvers (list): Cached SuperLU factorizations for the entire trajectory. """ def __init__(self, problem: Any, trajectory: Mat, h: float, n_states: int, method: DynamicIntegrationMethod = DynamicIntegrationMethod.DaeBackEuler, jac_evaluator: Optional[Any]=None, static_params:Optional[Vec]=None, n_event_params:int =0, t_trajectory:Optional[Vec]=None): """ Initializes the BlockEmtFloquetOperator. Args: problem: The underlying EMT DAE problem instance. trajectory: List of pre-captured steady-state tuples (x, y) over one period T. h: Integration time step. n_states: Number of differential state variables. method_str: Numerical integration method string. jac_evaluator: JIT-compiled Jacobian evaluator function. static_params: Array of static system parameters. n_event_params: Number of event-driven parameters. t_trajectory: Time array corresponding to the limit cycle trajectory. """ n_total_calc = problem.get_states_number() + problem.get_algebraic_var_number() op_shape = (n_states, n_states) op_dtype = np.float64 super().__init__(dtype=op_dtype, shape=op_shape) self.problem = problem self.trajectory = trajectory self.h = h self.n_states = n_states self.method = method self.n_total = n_total_calc self.lu_solvers = [] if jac_evaluator is not None: self._precompute_from_jit(jac_evaluator, static_params, n_event_params, t_trajectory) else: self._precompute_lu_factorizations() def _precompute_from_jit(self, jac_evaluator: Any, static_params: Vec, n_ev_params:int, t_trajectory:Optional[Vec]=None)->None: """ Evaluates and caches sparse LU factorizations using the JIT compiled evaluator. :param jac_evaluator: :param static_params: :param n_ev_params: :param t_trajectory: :return: """ full_params = np.empty(n_ev_params + len(static_params), dtype=np.float64) if len(static_params) > 0: full_params[n_ev_params:] = static_params ev_params = np.zeros(n_ev_params, dtype=np.float64) dx_dummy = np.zeros(self.n_total, dtype=np.float64) for i in range(1, len(self.trajectory)): step_k = self.trajectory[i] step_prev = self.trajectory[i - 1] step_prev2 = self.trajectory[i - 2] if i > 1 else step_prev x_k = step_k[0] if isinstance(step_k, tuple) else step_k x_prev = step_prev[0] if isinstance(step_prev, tuple) else step_prev x_prev2 = step_prev2[0] if isinstance(step_prev2, tuple) else step_prev2 t_curr = t_trajectory[i] if t_trajectory is not None else (i * self.h) if n_ev_params > 0: ev_params = self.problem.def_event_params_fn(ev_params, float(t_curr)) full_params[:n_ev_params] = ev_params J_sparse = jac_evaluator(states=x_k, params=full_params, history=x_prev, d_history=dx_dummy, h=self.h, history2=x_prev2) self.lu_solvers.append(spla.splu(J_sparse)) def _precompute_lu_factorizations(self)-> None: """ Pre-calculates and caches the Sparse LU factorizations for the Jacobian at every time step of the periodic limit cycle. """ for x_k, y_k in self.trajectory: # Extract discrete DAE Jacobians from the exact analytical/AD backend fx = self.problem.get_j11(x_k, y_k, self.h) fy = self.problem.get_j12(x_k, y_k, self.h) gx = self.problem.get_j21(x_k, y_k, self.h) gy = self.problem.get_j22(x_k, y_k, self.h) # Assemble the full augmented Jacobian block matrix J_aug = sp.vstack([ sp.hstack([fx, fy]), sp.hstack([gx, gy]) ], format='csc') # splu requires CSC format and caches the exact LU decomposition self.lu_solvers.append(spla.splu(J_aug)) def _matmat(self, X: Mat) -> Mat: """ Applies the Monodromy operator to a dense block of vectors. Args: X: Matrix block of shape (n_states, p). Returns: Propagated block matrix of shape (n_states, p). """ p_cols = X.shape[1] X_curr = np.zeros((self.n_total, p_cols), dtype=X.dtype) X_curr[:self.n_states, :] = X X_prev = X_curr.copy() X_prev2 = X_curr.copy() dX_prev = np.zeros_like(X_curr) rhs_block = np.zeros_like(X_curr) for lu in self.lu_solvers: if self.method == DynamicIntegrationMethod.DaeBackEuler: rhs_block[:self.n_states, :] = X_prev[:self.n_states, :] / self.h elif self.method == DynamicIntegrationMethod.DaeBDF2: rhs_block[:self.n_states, :] = (2.0 * X_prev[:self.n_states, :] - 0.5 * X_prev2[ :self.n_states, :]) / self.h elif self.method == DynamicIntegrationMethod.DaeTrapezoidal: rhs_block[:self.n_states, :] = (2.0 / self.h) * X_prev[:self.n_states, :] + dX_prev[:self.n_states, :] else: rhs_block[:self.n_states, :] = X_prev[:self.n_states, :] / self.h X_curr = lu.solve(rhs_block) if self.method == DynamicIntegrationMethod.DaeTrapezoidal: dX_prev[:self.n_states, :] = (2.0 / self.h) * (X_curr[:self.n_states, :] - X_prev[:self.n_states, :]) - \ dX_prev[:self.n_states, :] X_prev2 = X_prev.copy() X_prev = X_curr.copy() return X_curr[:self.n_states, :] def _matvec(self, v0:Vec)-> Vec: """ Fallback routine: If a standard solver (like scipy.sparse.linalg.eigs) calls this operator with a single 1D vector, it wraps it into a 2D matrix and routes it through the high-performance _matmat implementation. """ # Reshape to (N, 1), compute via block logic, and flatten back to 1D return self._matmat(v0.reshape(-1, 1)).flatten()
[docs] class AkStackBlockEmtFloquetOperator(spla.LinearOperator): """ Monodromy operator from an explicit stack of state-transition matrices A_k. This is the preferred path for EMT/Floquet benchmarking when the EMT engine can expose the per-step linearized state map: x_{k+1} = A_k x_k for one limit-cycle period. The operator applies: Phi = A_{M-1} * ... * A_1 * A_0 on vectors or blocks. When Numba is available and use_numba=True, the block path is JIT-compiled to reduce Python overhead in repeated Arnoldi calls. """ def __init__(self, Ak_stack: Mat, use_numba: bool = True, **kwargs): """ Initializes the AkStackBlockEmtFloquetOperator. Args: Ak_stack: 3D numpy array representing the sequence of transition matrices. use_numba: Flag to enable Numba acceleration for the block multiplication. """ Ak = np.asarray(Ak_stack, dtype=np.float64, order='C') if Ak.ndim != 3 or Ak.shape[1] != Ak.shape[2]: raise ValueError( f"Ak_stack must have shape (n_steps, n, n); got {Ak.shape}" ) self.n_states = int(Ak.shape[1]) self.Ak_stack = Ak self.n_steps = int(Ak.shape[0]) self.use_numba = bool(use_numba and NUMBA_AVAILABLE and apply_ak_stack_block_numba is not None) super().__init__(dtype=np.float64, shape=(self.n_states, self.n_states), **kwargs) def _matvec(self, x: Vec) -> Vec: """Applies the operator to a single 1D vector.""" X = np.asarray(x, dtype=np.float64).reshape(-1, 1) Y = self._matmat(X) return Y[:, 0] def _matmat(self, X: Mat) -> Mat: """Applies the transition matrix stack to a block of vectors.""" Xf = np.asarray(X, dtype=np.float64, order='C') if self.use_numba: return apply_ak_stack_block_numba(self.Ak_stack, Xf) Y = Xf for k in range(self.n_steps): Y = self.Ak_stack[k] @ Y return np.asarray(Y, dtype=np.float64, order='C')
[docs] def matmat(self, X: Mat) -> Mat: """Public alias for matrix-matrix multiplication.""" return self._matmat(X)