Source code for VeraGridEngine.Devices.Profiles.profile_int

from __future__ import annotations

from collections import Counter
from typing import Any

import numpy as np

from VeraGridEngine.basic_structures import IntVec
from VeraGridEngine.Devices.Profiles.sparse_array_int import SparseArrayInt


[docs] class ProfileInt: """ Profile specialized for ``int`` values. """ __slots__ = ( "_is_sparse", "_sparse_array", "_dense_array", "_sparsity_threshold", "_dtype", "_initialized", "_default_value", ) def __init__( self, default_value: int, arr: np.ndarray | None = None, sparsity_threshold: float = 0.8, is_sparse: bool = False, ) -> None: """ Build an integer profile. :param default_value: Declared default value. :param arr: Optional dense source array. :param sparsity_threshold: Sparse storage threshold. :param is_sparse: Requested initial sparse state. """ self._is_sparse: bool = is_sparse self._sparse_array: SparseArrayInt | None = None self._dense_array: np.ndarray | None = None self._sparsity_threshold: float = sparsity_threshold self._dtype: type[int] = int self._initialized: bool = False self._default_value: int = int(default_value) if arr is not None: self.set(arr=arr) else: self._initialized = False
[docs] def clear(self) -> None: """ Clear the profile contents. :return: ``None``. """ self._is_sparse = False self._sparse_array = None self._dense_array = None self._initialized = False
[docs] def info(self) -> dict[str, Any]: """ Get diagnostic information. :return: Information dictionary. """ return { "me": hex(id(self)), "initialized": self._initialized, "size": self.size(), "is_sparse": self._is_sparse, "sparsity_threshold": self._sparsity_threshold, "default_value": self.default_value, "dense_array": { "me": hex(id(self._dense_array)), "size": len(self._dense_array), } if self._dense_array is not None else "None", "sparse_array": self._sparse_array.info() if self._sparse_array is not None else "None", }
[docs] def get_sparse_map(self) -> dict[int, int]: """ Get the sparse map when the profile is sparse. :return: Sparse map or an empty dictionary. """ if self._sparse_array is not None: return self._sparse_array.get_map() else: return dict()
@property def dtype(self) -> type[int]: """ Get the declared type. :return: ``int``. """ return self._dtype @property def default_value(self) -> int: """ Get the profile default value. :return: Default value. """ if self._sparse_array is not None: return self._sparse_array.default_value else: return self._default_value @default_value.setter def default_value(self, val: int) -> None: """ Set the profile default value. :param val: New default value. :return: ``None``. """ self._default_value = int(val) if self._sparse_array is not None: self._sparse_array.default_value = self._default_value else: self._sparse_array = self._sparse_array @property def is_sparse(self) -> bool: """ Get whether the profile uses sparse storage. :return: Sparse state. """ return self._is_sparse @property def is_initialized(self) -> bool: """ Get whether the profile has been initialized. :return: Initialization state. """ return self._initialized
[docs] def set_initialized(self) -> None: """ Mark the profile as initialized. :return: ``None``. """ self._initialized = True
@property def sparse_array(self) -> SparseArrayInt | None: """ Get the sparse backing array. :return: Sparse array or ``None``. """ return self._sparse_array @property def dense_array(self) -> np.ndarray | None: """ Get the dense backing array. :return: Dense array or ``None``. """ return self._dense_array
[docs] def create_sparse(self, size: int, default_value: int, map_data: dict[int, int] | None = None) -> None: """ Create the profile in sparse form. :param size: Logical size. :param default_value: Sparse default value. :param map_data: Optional sparse map. :return: ``None``. """ self._is_sparse = True self._default_value = int(default_value) self._sparse_array = SparseArrayInt(default_value=self._default_value) if map_data is None: self._sparse_array.create(size=size, default_value=self._default_value) else: self._sparse_array.create_from_dict(default_value=self._default_value, size=size, map_data=map_data) self._dense_array = None self._initialized = True
[docs] def create_dense(self, size: int, default_value: int) -> None: """ Create the profile in dense form. :param size: Logical size. :param default_value: Dense fill value. :return: ``None``. """ self._is_sparse = False self._default_value = int(default_value) self._dense_array = np.full(size, self._default_value, dtype=int) self._sparse_array = None self._initialized = True
@property def sparsity(self) -> float: """ Get the sparse stored-entry ratio. :return: Sparsity ratio. """ if self._is_sparse and self._sparse_array is not None: return self._sparse_array.get_sparsity() else: return 0.0
[docs] def set(self, arr: np.ndarray) -> bool: """ Set the profile from a dense array. :param arr: Dense integer array. :return: ``True`` when the assignment succeeds. """ if not isinstance(arr, np.ndarray): print("You can only set numpy arrays") return False else: if arr.ndim != 1: print("You can only set 1D numpy arrays") return False else: if len(arr) == 0: return False else: try: arr_mod: np.ndarray = np.asarray(arr, dtype=int) except (TypeError, ValueError): print("Cannot set dense array because the type cast failed") return False if self.size() > 0: if len(arr_mod) != self.size(): raise ValueError("The array must have the same size as the profile") else: arr_mod = arr_mod else: arr_mod = arr_mod counts: Counter[int] = Counter(arr_mod.tolist()) most_common_element: int most_common_count: int most_common_element, most_common_count = counts.most_common(1)[0] sparsity_factor: float = most_common_count / len(arr_mod) if sparsity_factor >= self._sparsity_threshold: base_value: int = int(most_common_element) data_map: dict[int, int] = dict() non_default_indices: np.ndarray = np.flatnonzero(arr_mod != base_value) # The numeric path keeps sparse compression vectorized on large arrays. for raw_idx in non_default_indices: idx: int = int(raw_idx) data_map[idx] = int(arr_mod[idx]) self._is_sparse = True self._default_value = base_value self._sparse_array = SparseArrayInt(default_value=base_value) self._sparse_array.create(size=len(arr_mod), default_value=base_value, data=data_map) self._dense_array = None else: self._is_sparse = False self._dense_array = arr_mod self._sparse_array = None self._default_value = int(arr_mod[0]) self._initialized = True return True
def __eq__(self, other: object) -> bool: """ Compare two profiles. :param other: Object to compare against. :return: ``True`` when both profiles match. """ if isinstance(other, ProfileInt): if self._is_sparse == other._is_sparse: if self._is_sparse: if self._sparse_array is None and other._sparse_array is None: return self.default_value == other.default_value else: if self._sparse_array is not None and other._sparse_array is not None: return self._sparse_array == other._sparse_array else: return False else: if self._dense_array is None and other._dense_array is None: return self.default_value == other.default_value else: if self._dense_array is not None and other._dense_array is not None: return np.array_equal(self._dense_array, other._dense_array) else: return False else: return False else: return False def __ne__(self, other: object) -> bool: """ Compare two profiles for inequality. :param other: Object to compare against. :return: ``True`` when both profiles differ. """ return not self.__eq__(other) def __getitem__(self, key: int) -> int: """ Get a value by index. :param key: Position to read. :return: Value at ``key``. """ if self._is_sparse: if self._sparse_array is not None: return self._sparse_array[key] else: return self.default_value else: if self._dense_array is None: self._is_sparse = True self._sparse_array = SparseArrayInt(default_value=self.default_value) print("Initializing sparse when querying, this signals a mis initialization") return self.default_value else: return int(self._dense_array[key]) def __setitem__(self, key: int, value: int) -> None: """ Set a value by index. :param key: Position to write. :param value: Value to store. :return: ``None``. """ if isinstance(key, int): coerced_value: int = int(value) if self._is_sparse: if self._sparse_array is not None: if key < self._sparse_array.size(): self._sparse_array[key] = coerced_value else: raise AssertionError("Key out of bounds") else: raise AssertionError("Sparse profile not initialized") else: if self._dense_array is not None: if key < len(self._dense_array): self._dense_array[key] = coerced_value else: raise AssertionError("Key out of bounds") else: raise AssertionError("Dense profile not initialized") else: raise TypeError("Key must be an integer") def __imul__(self, other: float | int) -> "ProfileInt": """ Multiply the profile in place by a scalar. :param other: Scalar multiplier. :return: ``self``. """ if isinstance(other, (int, float)): self.scale(value=other) return self else: raise TypeError(f"Unsupported type {type(other)}")
[docs] def convert_sparse_to_dense(self) -> None: """ Convert the sparse profile into dense storage. :return: ``None``. """ if self._is_sparse and self._sparse_array is not None: self._dense_array = self._sparse_array.toarray() self._sparse_array = None self._is_sparse = False else: self._is_sparse = self._is_sparse
[docs] def resize(self, n: int) -> None: """ Resize the profile. :param n: New logical size. :return: ``None``. """ if isinstance(n, int): if self._initialized: if self._is_sparse: if self._sparse_array is not None: self._sparse_array.resize(n=n) else: raise AssertionError("Sparse profile not initialized") else: if self._dense_array is not None: new_array: np.ndarray = np.full(n, self.default_value, dtype=int) copy_size: int = min(len(self._dense_array), n) new_array[:copy_size] = self._dense_array[:copy_size] self._dense_array = new_array else: raise AssertionError("Dense profile not initialized") else: self._initialized = True self.create_sparse(size=n, default_value=self.default_value) else: raise TypeError("The size must be an integer")
[docs] def resample(self, indices: IntVec) -> None: """ Resample the profile in place. :param indices: New index selection. :return: ``None``. """ if self._is_sparse: if self._sparse_array is not None: self._sparse_array.resample(indices=indices) else: raise AssertionError("Sparse profile not initialized") else: if self._dense_array is not None: self._dense_array = self._dense_array[indices] else: raise AssertionError("Dense profile not initialized")
[docs] def fill(self, value: int) -> None: """ Fill the profile with a single value. :param value: Fill value. :return: ``None``. """ coerced_value: int = int(value) self.default_value = coerced_value self._is_sparse = True if self._sparse_array is None: self._sparse_array = SparseArrayInt(default_value=coerced_value) else: self._sparse_array = self._sparse_array self._sparse_array.fill(coerced_value) self._dense_array = None self._initialized = True
[docs] def scale(self, value: float | int) -> None: """ Scale the profile by a scalar. :param value: Scalar multiplier. :return: ``None``. """ if self._is_sparse: if self._sparse_array is not None: current_size: int = self._sparse_array.size() new_default_value: int = int(self.default_value * value) new_map: dict[int, int] = dict() # Scale only the explicit sparse entries and rebuild the sparse structure against the scaled default. for key, sparse_value in self._sparse_array.get_map().items(): scaled_value: int = int(sparse_value * value) if scaled_value != new_default_value: new_map[key] = scaled_value else: new_map = new_map self._sparse_array = SparseArrayInt(default_value=new_default_value) self._sparse_array.create(size=current_size, default_value=new_default_value, data=new_map) self._default_value = new_default_value else: raise AssertionError("Sparse profile not initialized") else: if self._dense_array is not None: self._dense_array = (self._dense_array * value).astype(int) self._default_value = int(self.default_value * value) else: raise AssertionError("Dense profile not initialized")
[docs] def size(self) -> int: """ Get the profile size. :return: Logical size. """ if self._initialized: if self._is_sparse: if self._sparse_array is not None: return self._sparse_array.size() else: return 0 else: if self._dense_array is not None: return len(self._dense_array) else: return 0 else: return 0
[docs] def toarray(self) -> np.ndarray: """ Get the dense numpy representation. :return: Dense integer array. """ if self.size() > 0: if self._is_sparse: if self._sparse_array is not None: return self._sparse_array.toarray() else: return np.zeros(0, dtype=int) else: if self._dense_array is not None: return self._dense_array else: return np.zeros(0, dtype=int) else: return np.zeros(0, dtype=int)
[docs] def tolist(self) -> list[int]: """ Get the dense list representation. :return: Dense list. """ return self.toarray().tolist()
[docs] def astype(self, tpe: Any) -> np.ndarray: """ Cast the dense representation to another dtype. :param tpe: Target dtype. :return: Cast dense array. """ return self.toarray().astype(tpe)
[docs] def get_sparse_representation(self) -> tuple[list[int], list[int]]: """ Export the sparse contents as parallel index and value lists. :return: Tuple ``(indices, values)``. """ if self._sparse_array is not None: return self._sparse_array.get_sparse_representation() else: return list(), list()
[docs] def set_sparse_data_from_data(self, indptr: list[int], data: list[int]) -> None: """ Load sparse data from parallel lists. :param indptr: Sparse indices. :param data: Sparse values. :return: ``None``. """ if self._sparse_array is not None: self._sparse_array.set_sparse_data_from_data(indptr=indptr, data=data) else: raise AssertionError("Sparse profile not initialized")
[docs] def fix_nan(self, default_value: float = 0.0) -> None: """ Keep the integer profile API aligned with the generic profile. :param default_value: Unused placeholder. :return: ``None``. """ self._default_value = self._default_value
[docs] def copy(self) -> "ProfileInt": """ Build a deep copy of the profile. :return: New profile copy. """ copied_profile: ProfileInt = ProfileInt( default_value=self.default_value, arr=None, sparsity_threshold=self._sparsity_threshold, is_sparse=self.is_sparse, ) if self._sparse_array is not None: copied_profile._sparse_array = self._sparse_array.copy() else: copied_profile._sparse_array = None if self._dense_array is not None: copied_profile._dense_array = self._dense_array.copy() else: copied_profile._dense_array = None copied_profile._default_value = self.default_value copied_profile._initialized = self.is_initialized return copied_profile