Source code for VeraGridEngine.Utils.NumericalMethods.emt_sparse_superlu_backend

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from __future__ import annotations

from typing import Dict

import numpy as np
from scipy.sparse import csc_matrix
from scipy.sparse.linalg import SuperLU, splu

from VeraGridEngine.Utils.NumericalMethods.external_sparse_solver_interface import (
    SparseLinearFactorizationHandle,
    SparseLinearSolverBackend,
    SparseLinearSolverBackendProvider,
)
from VeraGridEngine.basic_structures import Vec
from VeraGridEngine.enumerations import SparseSolver


def _permute_csc_data_by_columns(
        source_data: Vec,
        source_indptr: np.ndarray,
        column_perm: np.ndarray,
        target_indptr: np.ndarray,
        target_data: Vec,
) -> None:
    """
    Reorder CSC numeric values according to a persistent column permutation.

    :param source_data: Numeric values in the original column order.
    :type source_data: Vec
    :param source_indptr: Original CSC column pointer.
    :type source_indptr: np.ndarray
    :param column_perm: Persistent destination-to-source column permutation.
    :type column_perm: np.ndarray
    :param target_indptr: Target CSC column pointer.
    :type target_indptr: np.ndarray
    :param target_data: Destination numeric buffer.
    :type target_data: Vec
    :return: None.
    :rtype: None
    """
    dst_col: int = 0

    while dst_col < column_perm.shape[0]:
        src_col: int = int(column_perm[dst_col])
        src_ptr_0: int = int(source_indptr[src_col])
        src_ptr_1: int = int(source_indptr[src_col + 1])
        dst_ptr_0: int = int(target_indptr[dst_col])
        local_index: int = 0

        while src_ptr_0 + local_index < src_ptr_1:
            target_data[dst_ptr_0 + local_index] = source_data[src_ptr_0 + local_index]
            local_index += 1

        dst_col += 1


def _scatter_permuted_solution_to_original_order(
        permuted_solution: Vec,
        inverse_column_perm: np.ndarray,
        out_solution: Vec,
) -> None:
    """
    Scatter a solution computed on a column-permuted system back to original order.

    :param permuted_solution: Solution in permuted order.
    :type permuted_solution: Vec
    :param inverse_column_perm: Mapping from original index to permuted index.
    :type inverse_column_perm: np.ndarray
    :param out_solution: Destination vector in original order.
    :type out_solution: Vec
    :return: None.
    :rtype: None
    """
    var_index: int = 0

    while var_index < out_solution.shape[0]:
        out_solution[var_index] = permuted_solution[int(inverse_column_perm[var_index])]
        var_index += 1


[docs] class SuperLUFactorizationHandle(SparseLinearFactorizationHandle): """ Factorization handle backed by SciPy SuperLU. """ __slots__ = ["_lu_obj", "_active_matrix", "_ordered_bundle_active", "_has_persistent_ordering", "_inverse_column_perm"] def __init__( self, lu_obj: SuperLU, active_matrix: csc_matrix, ordered_bundle_active: bool, has_persistent_ordering: bool, inverse_column_perm: np.ndarray, ) -> None: """ Build the SuperLU factorization handle. :param lu_obj: SuperLU factorization object. :type lu_obj: SuperLU :param active_matrix: Active sparse matrix. :type active_matrix: csc_matrix :param ordered_bundle_active: Whether the ordered bundle is active. :type ordered_bundle_active: bool :param has_persistent_ordering: Whether a persistent ordering exists. :type has_persistent_ordering: bool :param inverse_column_perm: Mapping from original to permuted order. :type inverse_column_perm: np.ndarray :return: None. :rtype: None """ self._lu_obj: SuperLU = lu_obj self._active_matrix: csc_matrix = active_matrix self._ordered_bundle_active: bool = ordered_bundle_active self._has_persistent_ordering: bool = has_persistent_ordering self._inverse_column_perm: np.ndarray = inverse_column_perm
[docs] def solve_into(self, rhs: Vec, out_solution: Vec) -> None: """ Solve the sparse system into a caller-owned output buffer. :param rhs: Right-hand side vector. :type rhs: Vec :param out_solution: Caller-owned output buffer. :type out_solution: Vec :return: None. :rtype: None """ raw_solution: Vec = np.asarray(self._lu_obj.solve(rhs), dtype=np.float64) if self._ordered_bundle_active and self._has_persistent_ordering: _scatter_permuted_solution_to_original_order(raw_solution, self._inverse_column_perm, out_solution) else: out_solution[:] = raw_solution
[docs] def get_active_matrix(self) -> csc_matrix: """ Return the sparse matrix associated with the current factorization. :return: Active sparse matrix. :rtype: csc_matrix """ return self._active_matrix
[docs] class SuperLUSparseBackend(SparseLinearSolverBackend): """ EMT sparse backend backed by SciPy SuperLU. """ __slots__ = [ "_base_matrix", "_base_data", "_base_indptr_i32", "_ordered_matrix", "_ordered_data", "_ordered_indptr_i32", "_column_perm", "_inverse_column_perm", "_has_persistent_ordering", "_stats", ] def __init__(self, base_matrix: csc_matrix, base_data: Vec) -> None: """ Build the SuperLU EMT sparse backend. :param base_matrix: Reusable EMT Jacobian CSC shell. :type base_matrix: csc_matrix :param base_data: Reusable EMT Jacobian numeric buffer. :type base_data: Vec :return: None. :rtype: None """ n_cols: int = int(base_matrix.shape[1]) identity_perm: np.ndarray = np.arange(n_cols, dtype=np.int32) self._base_matrix: csc_matrix = base_matrix self._base_data: Vec = base_data self._base_indptr_i32: np.ndarray = np.asarray(base_matrix.indptr, dtype=np.int32) self._ordered_matrix: csc_matrix | None = None self._ordered_data: Vec | None = None self._ordered_indptr_i32: np.ndarray | None = None self._column_perm: np.ndarray = identity_perm.copy() self._inverse_column_perm: np.ndarray = identity_perm.copy() self._has_persistent_ordering: bool = False self._stats: Dict[str, float] = dict( numeric_factorizations=0.0, ordering_builds=0.0, ordering_reuses=0.0, fallback_solves=0.0, )
[docs] def get_name(self) -> str: """ Return the backend name. :return: Backend name. :rtype: str """ return "internal_superlu"
[docs] def get_solver_type(self) -> SparseSolver: """ Return the sparse solver type. :return: Sparse solver type. :rtype: SparseSolver """ return SparseSolver.SuperLU
[docs] def is_available(self) -> bool: """ Return whether the backend is available. :return: ``True``. :rtype: bool """ return True
[docs] def supports_symbolic_analysis_reuse(self) -> bool: """ Return whether this backend supports persistent symbolic reuse. :return: ``True``. :rtype: bool """ return True
def _try_build_persistent_ordering(self, lu_obj: SuperLU) -> None: """ Store one persistent column ordering discovered by SuperLU. :param lu_obj: SuperLU factorization object. :type lu_obj: SuperLU :return: None. :rtype: None """ if self._has_persistent_ordering: return else: pass raw_perm: np.ndarray = np.asarray(lu_obj.perm_c, dtype=np.int32) n_cols: int = int(self._base_matrix.shape[1]) if raw_perm.shape[0] != n_cols: return else: pass sorted_perm: np.ndarray = np.sort(raw_perm) if np.array_equal(sorted_perm, np.arange(n_cols, dtype=np.int32)): if np.array_equal(raw_perm, np.arange(n_cols, dtype=np.int32)): return else: self._column_perm = raw_perm.copy() inverse_perm: np.ndarray = np.zeros(n_cols, dtype=np.int32) perm_index: int = 0 while perm_index < n_cols: inverse_perm[int(self._column_perm[perm_index])] = perm_index perm_index += 1 ordered_matrix: csc_matrix = self._base_matrix[:, self._column_perm].copy() self._ordered_matrix = ordered_matrix self._ordered_data = ordered_matrix.data self._ordered_indptr_i32 = np.asarray(ordered_matrix.indptr, dtype=np.int32) self._inverse_column_perm = inverse_perm self._has_persistent_ordering = True self._stats["ordering_builds"] += 1.0 else: return
[docs] def factorize(self, matrix: csc_matrix, analysis_handle: object | None) -> SparseLinearFactorizationHandle: """ Factorize the current EMT Jacobian. :param matrix: Sparse matrix in EMT solver order. :type matrix: csc_matrix :param analysis_handle: Optional symbolic-analysis handle. :type analysis_handle: object | None :return: SuperLU factorization handle. :rtype: SparseLinearFactorizationHandle """ _unused_matrix: csc_matrix = matrix _unused_analysis_handle: object | None = analysis_handle lu_obj: SuperLU if self._has_persistent_ordering and self._ordered_matrix is not None and self._ordered_data is not None and self._ordered_indptr_i32 is not None: _permute_csc_data_by_columns( self._base_data, self._base_indptr_i32, self._column_perm, self._ordered_indptr_i32, self._ordered_data, ) lu_obj = splu(self._ordered_matrix, permc_spec="NATURAL") self._stats["ordering_reuses"] += 1.0 active_matrix: csc_matrix = self._ordered_matrix ordered_bundle_active: bool = True else: lu_obj = splu(self._base_matrix, permc_spec="COLAMD") self._try_build_persistent_ordering(lu_obj) active_matrix = self._base_matrix ordered_bundle_active = False self._stats["numeric_factorizations"] += 1.0 return SuperLUFactorizationHandle( lu_obj=lu_obj, active_matrix=active_matrix, ordered_bundle_active=ordered_bundle_active, has_persistent_ordering=self._has_persistent_ordering, inverse_column_perm=self._inverse_column_perm, )
[docs] def get_backend_stats(self) -> Dict[str, float]: """ Return backend-specific statistics. :return: Backend-specific statistics. :rtype: Dict[str, float] """ return dict(self._stats)
[docs] class SuperLUSparseBackendProvider(SparseLinearSolverBackendProvider): """ Provider for the internal SuperLU EMT sparse backend. """ __slots__ = []
[docs] def get_name(self) -> str: """ Return the provider name. :return: Provider name. :rtype: str """ return "internal_superlu"
[docs] def get_solver_type(self) -> SparseSolver: """ Return the sparse solver type. :return: Sparse solver type. :rtype: SparseSolver """ return SparseSolver.SuperLU
[docs] def is_available(self) -> bool: """ Return whether the provider is available. :return: ``True``. :rtype: bool """ return True
[docs] def create_backend(self, base_matrix: csc_matrix, base_data: Vec) -> SparseLinearSolverBackend: """ Create the internal SuperLU sparse backend. :param base_matrix: Reusable EMT Jacobian CSC shell. :type base_matrix: csc_matrix :param base_data: Reusable EMT Jacobian numeric buffer. :type base_data: Vec :return: Sparse solver backend. :rtype: SparseLinearSolverBackend """ return SuperLUSparseBackend(base_matrix=base_matrix, base_data=base_data)