Source code for VeraGridEngine.Utils.Sparse.sparse_array

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

from typing import Dict, Any, Union
import numpy as np
from enum import Enum
from VeraGridEngine.enumerations import DeviceType
from VeraGridEngine.basic_structures import Numeric, NumericVec, IntVec, Vec

PROFILE_TYPES = Union[type(bool), type(int), type(float), DeviceType, type(Vec)]


[docs] def check_type(dtype: PROFILE_TYPES, value: Any) -> bool: """ Checks that the type of value is the declared type in the profile :param dtype: expected type :param value: Any value :return: """ tpe = type(value) if tpe in [bool, np.bool_]: return dtype == bool elif tpe in [int, np.int32, np.int64]: return dtype == int or dtype == float elif tpe in [float, np.float32, np.float64]: return dtype == float elif issubclass(tpe, Enum): return tpe == dtype # check that the enum type is the same elif isinstance(dtype, DeviceType): return True elif dtype == Vec: return isinstance(value, np.ndarray) or value is None else: raise Exception("Sparse array type value Not recognized")
[docs] class SparseArray: """ SparseArray """ __slots__ = ( '_dtype', '_default_value', '_size', '_map', ) def __init__(self, data_type: PROFILE_TYPES, default_value: Any, size: int = 0) -> None: """ :param data_type: :param default_value: :param size: """ self._dtype = data_type self._default_value: Any | None = self._coerce_value(default_value) self._size: int = size self._map: Dict[int, Any] = dict() def _coerce_value(self, value: Any) -> Any: if value is None: return None if isinstance(self._dtype, DeviceType): return value if isinstance(self._dtype, type) and issubclass(self._dtype, Enum): return self._dtype(value) return self._dtype(value)
[docs] def copy(self) -> "SparseArray": """ Get a deep copy of this object :return: A new SparseArray copy of this object """ cpy = SparseArray(data_type=self._dtype, default_value=self._default_value, size=self._size) cpy._map = self._map.copy() return cpy
@property def dtype(self) -> Union[bool, int, float, DeviceType]: """ Get the declared type :return: type """ return self._dtype @property def default_value(self): """ Default value getter :return: numeric value """ return self._default_value @default_value.setter def default_value(self, val): """ Set the default value, updating the map to maintain consistency. When the default value changes, we need to: 1. Store the old default for positions that were using it (not in map) 2. Remove positions from the map that now equal the new default :param val: :return: """ if isinstance(self.dtype, DeviceType): if val == "None": val2 = None else: val2 = val elif issubclass(self._dtype, Enum): # if it is an Enum type, cast the value to the Enum val2 = self._dtype(val) else: val2 = val check_type(dtype=self.dtype, value=val2) # If the value is the same, do nothing if val2 == self._default_value: return # If the map has entries and we are changing the default, we need to update the map # to maintain consistency if self._size > 0 and len(self._map) < self._size: old_default = self._default_value # Add the old default to all positions not currently in the map # These positions implicitly had the old default value for i in range(self._size): if i not in self._map: self._map[i] = old_default # Now remove any entries from the map that equal the new default keys_to_remove = [k for k, v in self._map.items() if v == val2] for k in keys_to_remove: del self._map[k] self._default_value = val2
[docs] def info(self): """ Return dictionary with information about the profile object and its content :return: """ return { "me": hex(id(self)), "default_value": self._default_value, "size": self._size, "map": hex(id(self._map)), }
[docs] def get_map(self) -> Dict[int, Numeric]: """ Return the dictionary hosting the sparse data :return: Dict[int, Numeric] """ return self._map
[docs] def insert(self, i: int, x: Numeric): """ Insert an element in the data dictionary :param i: :param x: :return: """ self._map[i] = x
[docs] def get_sparsity(self) -> float: """ Get the sparsity of this profile :return: Sparsity metric """ return float(len(self._map)) / float(self._size)
[docs] def create(self, size: int, default_value: PROFILE_TYPES, data: Union[Dict[int, Numeric], None] = None) -> "SparseArray": """ Build sparse from definition :param size: size :param default_value: default value :param data: data map """ self.default_value = self._coerce_value(default_value) self._size = size self._map = data if data is not None else dict() return self
[docs] def create_from_array(self, array: NumericVec, default_value: PROFILE_TYPES) -> "SparseArray": """ Build sparse from array :param array: NumericVec :param default_value: default value of the array """ self.default_value = self._coerce_value(default_value) self._size = len(array) self._map: Dict[int, Numeric] = dict() for i, val in enumerate(array): if val != default_value: self._map[i] = val return self
[docs] def create_from_dict(self, default_value: PROFILE_TYPES, size: int, map_data: Dict[int, Numeric]) -> "SparseArray": """ Create this array from dict data :param default_value: :param size: :param map_data: :return: """ self.default_value = self._coerce_value(default_value) self._size = size self._map = map_data return self
[docs] def fill(self, value: Any): """ Fill the sparse array with the same value :param value: any value """ self.default_value = self._coerce_value(value) self._map = dict()
[docs] def toarray(self) -> NumericVec: """ Get numpy vector from this sparse structure :return: NumericVec """ arr = np.full(self._size, self._default_value) for key, val in self._map.items(): arr[key] = val return arr
[docs] def at(self, idx: int) -> Any: """ Get the array at a position :param idx: index :return: Numeric value """ if len(self._map) == 0: return self._default_value else: val = self._map.get(idx, None) if val is None: return self._default_value else: return val
def __getitem__(self, key: int) -> Any: return self.at(idx=key) def __setitem__(self, key: int, value: Any) -> None: if isinstance(key, int): assert key < self._size if value != self._default_value: self._map[key] = value else: raise TypeError("Key must be an integer") def __eq__(self, other: "SparseArray") -> bool: """ Equality operator :param other: SparseArray :return: bool """ if self._default_value != other._default_value: return False if self._size != other._size: return False if self._map != other._map: return False return True
[docs] def size(self) -> int: """ Get the size :return: integer """ return self._size
[docs] def clear(self): """ Clear the sparse array :return: """ self._map.clear() self._size = 0
[docs] def set_data(self, d: Dict[int, Any]): self._map = d
[docs] def resize(self, n: int): """ Resize the array :param n:number of elements. If n is smaller than the current container size, the content is reduced to its first n elements, removing those beyond (and destroying them) """ if n < self._size: # we need to delete the elements out of range for key, val in self._map.items(): if key >= n: # delete elements whose index is now out of bounds from # the index -> value map. del self._map[key] self._size = n
[docs] def resample(self, indices: IntVec): """ Resample this sparse array in-place :param indices: array of integer indices (not repeated) """ self._size = len(indices) """ We need to re-index the sparse entries 0 1 2 3 4 5 6 7 8 original dense vector [0, 0, 2, 0, 7, 0, 0, 0, 3] map: {{2: 2}, {4: 7}, {8: 3}} Now we resample with indices [2, 5, 8] old idx 2 5 8 -> indices new idx 0 1 2 -> indices' positions the supposedly modified vector is: [2, 0, 3] the new map is: {{0: 2:}, {2: 3}} """ new_map: Dict[int, Numeric] = dict() for i, idx in enumerate(indices): it = self._map.get(idx, None) if it is not None: # found, keep the value at the new index new_map[i] = it self._map = new_map
[docs] def slice(self, indices: IntVec) -> "SparseArray": """ Get a resampled copy of this sparse array :param indices: array of integer indices (not repeated) """ cpy = self.copy() cpy.resample(indices) return cpy
[docs] def get_sparse_representation(self): """ Get the sparse representation of the sparse data :return: """ indptr = list() data = list() for i, x in self._map: indptr.append(i) data.append(x) return indptr, data
[docs] def set_sparse_data_from_data(self, indptr, data): """ :param indptr: :param data: :return: """ for i, x in zip(indptr, data): self._map[i] = x
[docs] class SparseObjectArray: """ SparseArray """ __slots__ = ( "_size", "_map", ) def __init__(self, n: int) -> None: """ :param n: Number of elements """ self._size: int = n self._map: Dict[int, object] = dict()
[docs] def copy(self) -> "SparseObjectArray": """ Get a deep copy of this object :return: A new SparseObjectArray copy of this object """ cpy = SparseObjectArray(n=self._size) cpy._size = self._size cpy._map = self._map.copy() return cpy
[docs] def info(self): """ Return dictionary with information about the profile object and its content :return: """ return { "me": hex(id(self)), "size": self._size, "map": hex(id(self._map)), }
[docs] def get_map(self) -> Dict[int, object]: """ Return the dictionary hosting the sparse data :return: Dict[int, Numeric] """ return self._map
[docs] def insert(self, i: int, x: object): """ Insert an element in the data dictionary :param i: :param x: :return: """ self._map[i] = x
[docs] def get_sparsity(self) -> float: """ Get the sparsity of this profile :return: Sparsity metric """ return float(len(self._map)) / float(self._size)
[docs] def at(self, idx: int) -> Any: """ Get the array at a position :param idx: index :return: Numeric value """ if len(self._map) == 0: return None else: return self._map.get(idx, None)
def __getitem__(self, key: int) -> Any: return self.at(idx=key) def __setitem__(self, key: int, value: Any) -> None: if isinstance(key, int): assert key < self._size if value is not None: self._map[key] = value else: raise TypeError("Key must be an integer") def __eq__(self, other: "SparseObjectArray") -> bool: """ Equality operator :param other: SparseArray :return: bool """ if self._size != other._size: return False if self._map != other._map: return False return True
[docs] def size(self) -> int: """ Get the size :return: integer """ return self._size
[docs] def resize(self, n: int): """ Resize the array :param n:number of elements. If n is smaller than the current container size, the content is reduced to its first n elements, removing those beyond (and destroying them) """ if n < self._size: # we need to delete the elements out of range for key, val in self._map.items(): if key >= n: # delete elements whose index is now out of bounds from # the index -> value map. del self._map[key] self._size = n
[docs] def resample(self, indices: IntVec): """ Resample this sparse array in-place :param indices: array of integer indices (not repeated) """ self._size = len(indices) """ We need to re-index the sparse entries 0 1 2 3 4 5 6 7 8 original dense vector [0, 0, 2, 0, 7, 0, 0, 0, 3] map: {{2: 2}, {4: 7}, {8: 3}} Now we resample with indices [2, 5, 8] old idx 2 5 8 -> indices new idx 0 1 2 -> indices' positions the supposedly modified vector is: [2, 0, 3] the new map is: {{0: 2:}, {2: 3}} """ new_map: Dict[int, Numeric] = dict() for i, idx in enumerate(indices): it = self._map.get(idx, None) if it is not None: # found, keep the value at the new index new_map[i] = it self._map = new_map
[docs] def slice(self, indices: IntVec) -> "SparseObjectArray": """ Get a resampled copy of this sparse array :param indices: array of integer indices (not repeated) """ cpy = self.copy() cpy.resample(indices) return cpy