Source code for VeraGridEngine.Utils.Sparse.csc

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import numpy as np
import numba as nb
import scipy.sparse.sparsetools as sptools
from scipy.sparse import csc_matrix

import VeraGridEngine.Utils.Sparse.csc_numba as csc_numba
from VeraGridEngine.basic_structures import Mat


[docs] def Csc0(m: int, n: int): return csc_matrix((m, n), dtype=float)
[docs] class CscMat(csc_matrix): """ Matrix in compressed-column or triplet form. """ __slots__ = ( "m", "n", ) def __init__(self, arg1, shape=None, dtype=None, copy=False): """ CSC sparse matrix Format explanation example 0 1 2 _________ 0 | 4 | 1 | 3 9 | 2 | 7 8 | 3 | 3 8 | 4 | 8 9 | 5 | 4 | --------- cols = 3 rows = 6 0 1 2 3 4 5 6 7 8 9 <-- These are the positions indicated by indptr (just to illustrate) data = [4, 3, 3, 9, 7, 8, 4, 8, 8, 9] # stores the values indices = [0, 1, 3, 1, 2, 4, 5, 2, 3, 4] # indicates the row index indptr = [0, 3, 7, 10] # The length is cols + 1, stores the from and to indices that delimit a column. i.e. the first column takes the indices and data from the positions 0 to 3-1, this is column_idx = 0 # (j) indices = [0 , 1, 3] # row indices (i) of the column (j) data = [10, 3, 3] Typical loop: for j in range(n): # for every column, same as range(cols) for k in range(indptr[j], indptr[j+1]): # for every entry in the column i = indices[k] value = data[k] print(i, j, value) For completeness, the CSR equivalent is 0 1 2 3 4 5 6 7 8 9 data = [4, 3, 9, 7, 8, 3, 8, 8, 9, 4] indices = [0, 0, 1, 1, 2, 0, 2, 1, 2, 1] indptr = [0, 1, 3, 5, 7, 9, 10] @param shape: (number of rows, number of cols) @param dtype: data type (float, int, etc..) @param copy: copy the data (bool) """ csc_matrix.__init__(self, arg1, shape, dtype, copy) # number of rows self.m = self.shape[0] # number of columns self.n = self.shape[1] def __add__(self, other) -> "CscMat": """ Matrix addition :param other: CscMat instance :return: CscMat instance """ if isinstance(other, CscMat): # matrix-matrix addition assert (other.m == self.m) assert (other.n == self.n) nz_max = self.nnz + other.nnz indptr = np.zeros(self.n + 1, dtype=np.int32) # row indices, size nzmax indices = np.zeros(nz_max, dtype=np.int32) # numerical values, size nzmax data = np.zeros(nz_max, dtype=np.float64) sptools.csc_plus_csc(self.m, self.n, self.indptr, self.indices, self.data, other.indptr, other.indices, other.data, indptr, indices, data) return CscMat((data, indices, indptr), shape=self.shape) elif isinstance(other, float) or isinstance(other, int): raise NotImplementedError('Adding a nonzero scalar to a sparse matrix would make it a dense matrix.') else: raise NotImplementedError('Type not supported') def __sub__(self, other) -> "CscMat": """ Matrix subtraction :param other: CscMat instance :return: CscMat instance """ if isinstance(other, CscMat): # subtract CSC matrix assert (other.m == self.m) assert (other.n == self.n) nz_max = self.nnz + other.nnz indptr = np.zeros(self.n + 1, dtype=np.int32) # row indices, size nzmax indices = np.zeros(nz_max, dtype=np.int32) # numerical values, size nzmax data = np.zeros(nz_max, dtype=np.float64) sptools.csc_minus_csc(self.m, self.n, self.indptr, self.indices, self.data, other.indptr, other.indices, other.data, indptr, indices, data) return CscMat((data, indices, indptr), shape=self.shape) elif isinstance(other, float) or isinstance(other, int): # Add scalar value raise NotImplementedError('Adding a non-zero scalar to a sparse matrix would make it a dense matrix.') else: raise NotImplementedError('Type not supported') def __mul__(self, other): """ Matrix multiplication :param other: CscMat instance :return: CscMat instance """ if isinstance(other, CscMat): # mat-mat multiplication # 2-pass matrix multiplication Cp = np.empty(self.n + 1, dtype=np.int32) sptools.csc_matmat_pass1(self.n, other.m, self.indptr, self.indices, other.indptr, other.indices, Cp) nnz = Cp[-1] Ci = np.empty(nnz, dtype=np.int32) Cx = np.empty(nnz, dtype=np.float64) sptools.csc_matmat_pass2(self.n, other.m, self.indptr, self.indices, self.data, other.indptr, other.indices, other.data, Cp, Ci, Cx) return CscMat((Cx, Ci, Cp), shape=self.shape) elif isinstance(other, np.ndarray): # multiply by a vector or array of vectors if len(other.shape) == 1: y = np.zeros(self.m, dtype=np.float64) sptools.csc_matvec(self.m, self.n, self.indptr, self.indices, self.data, other, y) return y elif len(other.shape) == 2: """ * Input Arguments: * I n_row - number of rows in A * I n_col - number of columns in A * I n_vecs - number of column vectors in X and Y * I Ap[n_row+1] - row pointer * I Aj[nnz(A)] - column indices * T Ax[nnz(A)] - nonzeros * T Xx[n_col,n_vecs] - input vector * * Output Arguments: * T Yx[n_row,n_vecs] - output vector * * Note: * Output array Yx must be preallocated * void csc_matvecs(const I n_row, const I n_col, const I n_vecs, const I Ap[], const I Ai[], const T Ax[], const T Xx[], T Yx[]) """ n_col, n_vecs = other.shape y = np.zeros((self.m, n_vecs), dtype=np.float64) sptools.csc_matvecs(self.m, self.n, n_vecs, self.indptr, self.indices, self.data, other, y) return y elif isinstance(other, float) or isinstance(other, int): # multiply by a scalar value C = self.copy() C.data *= other return C else: raise Exception('Type not supported')
[docs] def dot(self, o) -> "CscMat": """ Dot product :param o: CscMat instance :return: CscMat instance """ # 2-pass matrix multiplication Cp = np.empty(self.n + 1, dtype=np.int32) sptools.csc_matmat_pass1(self.n, o.tap_module, self.indptr, self.indices, o.indptr, o.indices, Cp) nnz = Cp[-1] Ci = np.empty(nnz, dtype=np.int32) Cx = np.empty(nnz, dtype=np.float64) sptools.csc_matmat_pass2(self.n, o.tap_module, self.indptr, self.indices, self.data, o.indptr, o.indices, o.data, Cp, Ci, Cx) return CscMat((Cx, Ci, Cp), shape=self.shape)
# @property # def T(self): # m, n, Cp, Ci, Cx = csc_transpose(self.m, self.n, self.indptr, self.indices, self.data) # return CscMat((Cx, Ci, Cp), shape=(m, n))
[docs] def islands(self): """ Find islands in the matrix :return: list of islands """ islands = csc_numba.find_islands(self.n, self.indptr, self.indices) return [np.sort(island) for island in islands]
[docs] def scipy_to_mat(scipy_mat: csc_matrix): """ Build CsCMat from csc_matrix :param scipy_mat: :return: CscMat """ return CscMat((scipy_mat.data, scipy_mat.indices, scipy_mat.indptr), shape=scipy_mat.shape)
[docs] def pack_4_by_4(A11: CscMat, A12: CscMat, A21: CscMat, A22: CscMat): """ Stack 4 CSC matrices in a 2 by 2 structure stack csc sparse float matrices like this: | A11 | A12 | | A21 | A22 | :param A11: Upper left matrix :param A12: Upper right matrix :param A21: Lower left matrix :param A22: Lower right matrix :return: Stitched matrix """ m, n, Pi, Pp, Px = csc_numba.csc_stack_4_by_4_ff(A11.shape[0], A11.shape[1], A11.indices, A11.indptr, A11.data, A12.shape[0], A12.shape[1], A12.indices, A12.indptr, A12.data, A21.shape[0], A21.shape[1], A21.indices, A21.indptr, A21.data, A22.shape[0], A22.shape[1], A22.indices, A22.indptr, A22.data) return CscMat((Px, Pi, Pp), shape=(m, n))
[docs] def pack_4_by_4_scipy(A11: csc_matrix, A12: csc_matrix, A21: csc_matrix, A22: csc_matrix) -> csc_matrix: """ Stack 4 CSC matrices in a 2 by 2 structure stack csc sparse float matrices like this: | A11 | A12 | | A21 | A22 | :param A11: Upper left matrix :param A12: Upper right matrix :param A21: Lower left matrix :param A22: Lower right matrix :return: Stitched matrix """ assert A11.format == "csc" assert A12.format == "csc" assert A21.format == "csc" assert A22.format == "csc" m, n, Pi, Pp, Px = csc_numba.csc_stack_4_by_4_ff(A11.shape[0], A11.shape[1], A11.indices, A11.indptr, A11.data, A12.shape[0], A12.shape[1], A12.indices, A12.indptr, A12.data, A21.shape[0], A21.shape[1], A21.indices, A21.indptr, A21.data, A22.shape[0], A22.shape[1], A22.indices, A22.indptr, A22.data) return csc_matrix((Px, Pi, Pp), shape=(m, n))
[docs] def pack_3_by_4(A11: CscMat, A12: CscMat, A21: CscMat): """ Stack 3 CSC matrices in a 2 by 2 structure stack csc sparse float matrices like this: | A11 | A12 | | A21 | 0 | :param A11: Upper left matrix :param A12: Upper right matrix :param A21: Lower left matrix :return: Stitched matrix """ m, n, Pi, Pp, Px = csc_numba.csc_stack_3_by_4_ff(A11.shape[0], A11.shape[1], A11.indices, A11.indptr, A11.data, A12.shape[0], A12.shape[1], A12.indices, A12.indptr, A12.data, A21.shape[0], A21.shape[1], A21.indices, A21.indptr, A21.data) return CscMat((Px, Pi, Pp), shape=(m, n))
[docs] def sp_transpose(mat: csc_matrix): """ Actual CSC transpose unlike scipy's :param mat: CSC matrix :return: CSC transposed matrix """ Cm, Cn, Cp, Ci, Cx = csc_numba.csc_transpose(m=mat.shape[0], n=mat.shape[1], Ap=mat.indptr, Ai=mat.indices, Ax=mat.data) return csc_matrix((Cx, Ci, Cp), shape=(Cm, Cn))
[docs] def sp_slice_cols(mat: csc_matrix, cols: np.ndarray): """ Slice columns :param mat: Matrix to slice :param cols: vector of columns :return: New sliced matrix """ new_indices, new_col_ptr, new_val, nrows, ncols = csc_numba.sp_submat_c_numba(nrows=mat.shape[0], ptrs=mat.indptr, indices=mat.indices, values=mat.data, cols=cols) return csc_matrix((new_val, new_indices, new_col_ptr), shape=(nrows, ncols))
[docs] def sp_slice_rows(mat: csc_matrix, rows: np.ndarray): """ Slice rows :param mat: :param rows: :return: CSC matrix """ # mat2 = sp_transpose(mat) # return sp_transpose(sp_slice_cols(mat2, rows)) return sp_transpose(sp_slice_cols(sp_transpose(mat), np.array(rows)))
[docs] def sp_slice(mat: csc_matrix, rows, cols): """ /* * This function performs the trivial slicing of the CSC sparse matrix A * * Steps: * - Slice the columns with "sp_submat_c(A, cols)" * - convert to CSR with .t() {transpose} * - Slice the rows with sp_submat_c(B, rows), because it is a CSR now * - Convert the result back to CSC with the final .t() * */ :param mat: :param rows: :param cols: :return: """ # mat2 = sp_transpose(sp_slice_cols(mat, cols)) # return sp_transpose(sp_slice_cols(mat2, rows)) new_val, new_row_ind, new_col_ptr, n_rows, n_cols, nnz = csc_numba.csc_sub_matrix(Am=mat.shape[0], Annz=mat.nnz, Ap=mat.indptr, Ai=mat.indices, Ax=mat.data, rows=rows, cols=cols) new_val = np.resize(new_val, nnz) new_row_ind = np.resize(new_row_ind, nnz) return csc_matrix((new_val, new_row_ind, new_col_ptr), shape=(n_rows, n_cols))
[docs] def csc_stack_2d_ff(mats, m_rows=1, m_cols=1, row_major=True): """ Assemble matrix from a list of matrices representing a "super matrix" |mat11 | mat12 | mat13 | |mat21 | mat22 | mat23 | if row-major turns into: mats = [mat11, mat12, mat13, mat21, mat22, mat23] else: (it is column major) mats = [mat11, mat21, mat12, mat22, mat31, mat32] m_rows = 2 m_cols = 3 :param mats: array of CSC matrices arranged in row-major or column-major order into a list :param m_rows: number of rows of the mats structure :param m_cols: number of cols of the mats structure :param row_major: mats is sorted in row major, else it is sorted in column major :return: Final assembled matrix in CSC format """ mats_data = list() mats_indptr = list() mats_indices = list() mats_cols = list() mats_rows = list() for x in mats: mats_data.append(x.data) mats_indptr.append(x.indptr) mats_indices.append(x.indices) mats_cols.append(x.shape[1]) mats_rows.append(x.shape[0]) if row_major: data, indices, indptr, nrows, ncols = csc_numba.csc_stack_2d_ff_row_major(nb.typed.List(mats_data), nb.typed.List(mats_indptr), nb.typed.List(mats_indices), nb.typed.List(mats_cols), nb.typed.List(mats_rows), m_rows, m_cols) else: data, indices, indptr, nrows, ncols = csc_numba.csc_stack_2d_ff_col_major(nb.typed.List(mats_data), nb.typed.List(mats_indptr), nb.typed.List(mats_indices), nb.typed.List(mats_cols), nb.typed.List(mats_rows), m_rows, m_cols) return csc_matrix((data, indices, indptr), shape=(nrows, ncols))
[docs] def csc_stack_2d_ff_old(mats, m_rows=1, m_cols=1): """ Assemble matrix from a list of matrices representing a "super matrix" |mat11 | mat12 | mat13 | |mat21 | mat22 | mat23 | turns into: mats = [mat11, mat12, mat13, mat21, mat22, mat23] m_rows = 2 m_cols = 3 :param mats: array of CSC matrices arranged in row-major order into a list :param m_rows: number of rows of the mats structure :param m_cols: number of cols of the mats structure :return: Final assembled matrix """ # pass 1: compute the number of non zero nnz = 0 nrows = 0 ncols = 0 for r in range(m_rows): nrows += mats[r * m_cols].shape[0] # equivalent to mats[r, 0] for c in range(m_cols): mat = mats[r * m_cols + c] # equivalent to mats[r, c] nnz += mat.indptr[mat.shape[1]] if r == 0: ncols += mat.shape[1] # pass 2: fill in the data indptr = np.empty(ncols + 1, dtype=np.int32) indices = np.empty(nnz, dtype=np.int32) data = np.empty(nnz, dtype=np.float64) cnt = 0 indptr[0] = 0 offset_col = 0 for c in range(m_cols): # for each column of the array of matrices n = mats[c].shape[1] # equivalent to mats[0, c] for j in range(n): # for every column of the column of matrices offset_row = 0 for r in range(m_rows): # for each row of the array of rows mat = mats[r * m_cols + c] # equivalent to mats[r, c] m = mat.shape[0] Ap = mat.indptr Ai = mat.indices Ax = mat.data for k in range(Ap[j], Ap[j + 1]): # for every entry in the column from A indices[cnt] = Ai[k] + offset_row # row index data[cnt] = Ax[k] cnt += 1 offset_row += m indptr[offset_col + j + 1] = cnt offset_col += n # return nrows, ncols, indices, indptr, data return csc_matrix((data, indices, indptr), shape=(nrows, ncols))
[docs] def dense_to_csc(mat: Mat, threshold: float) -> csc_matrix: """ Extract the sparse matrix from a dense matrix where abs values are below a threshold :param mat: dense matrix :param threshold: threshold :return: CSC sparse matrix """ data, indices, indptr = csc_numba.dense_to_csc_numba(mat, threshold) return csc_matrix((data, indices, indptr), shape=mat.shape)
[docs] def diags(array: np.ndarray) -> csc_matrix: """ Convert array to CSC diagonal matrix :param array: :return: """ m = len(array) if array.dtype == complex: return csc_matrix(csc_numba.csc_diagonal_from_complex_array(array), shape=(m, m)) else: return csc_matrix(csc_numba.csc_diagonal_from_array(array), shape=(m, m))
[docs] def diagc(n, value) -> csc_matrix: """ Create constant value diagonal matrix :param n: size :param value: value :return: """ return csc_matrix(csc_numba.csc_diagonal_from_number(n, value), shape=(n, n))