Source code for VeraGridEngine.Devices.Profiles.profile_enum

from __future__ import annotations

from collections import Counter
from enum import Enum
from typing import Any

import numpy as np

from VeraGridEngine.basic_structures import IntVec
from VeraGridEngine.Devices.Profiles.sparse_array_enum import SparseArrayEnum


[docs] class ProfileEnum: """ Profile specialized for enum values. """ __slots__ = ( "_is_sparse", "_sparse_array", "_dense_array", "_sparsity_threshold", "_dtype", "_initialized", "_default_value", ) def __init__( self, default_value: Enum | None, enum_type: type[Enum], arr: np.ndarray | None = None, sparsity_threshold: float = 0.8, is_sparse: bool = False, ) -> None: """ Build an enum profile. :param default_value: Declared default value. :param enum_type: Enum class stored by the profile. :param arr: Optional dense source array. :param sparsity_threshold: Sparse storage threshold. :param is_sparse: Requested initial sparse state. """ self._is_sparse: bool = is_sparse self._sparse_array: SparseArrayEnum | None = None self._dense_array: np.ndarray | None = None self._sparsity_threshold: float = sparsity_threshold self._dtype: type[Enum] = enum_type self._initialized: bool = False self._default_value: Enum | None = self._coerce_value(default_value) if arr is not None: self.set(arr=arr) else: self._initialized = False def _coerce_value(self, value: Any) -> Enum | None: """ Coerce a raw value into the declared enum type. :param value: Raw value. :return: Enum value or ``None``. """ if value is None: return None else: if isinstance(value, self._dtype): return value else: return self._dtype(value)
[docs] def clear(self) -> None: """ Clear the profile contents. :return: ``None``. """ self._is_sparse = False self._sparse_array = None self._dense_array = None self._initialized = False
[docs] def info(self) -> dict[str, Any]: """ Get diagnostic information. :return: Information dictionary. """ return { "me": hex(id(self)), "initialized": self._initialized, "size": self.size(), "is_sparse": self._is_sparse, "sparsity_threshold": self._sparsity_threshold, "default_value": self.default_value, "dense_array": { "me": hex(id(self._dense_array)), "size": len(self._dense_array), } if self._dense_array is not None else "None", "sparse_array": self._sparse_array.info() if self._sparse_array is not None else "None", }
[docs] def get_sparse_map(self) -> dict[int, Enum | None]: """ Get the sparse map when the profile is sparse. :return: Sparse map or an empty dictionary. """ if self._sparse_array is not None: return self._sparse_array.get_map() else: return dict()
@property def dtype(self) -> type[Enum]: """ Get the declared enum type. :return: Enum class. """ return self._dtype @property def default_value(self) -> Enum | None: """ Get the profile default value. :return: Default value. """ if self._sparse_array is not None: return self._sparse_array.default_value else: return self._default_value @default_value.setter def default_value(self, val: Enum | None) -> None: """ Set the profile default value. :param val: New default value. :return: ``None``. """ self._default_value = self._coerce_value(val) if self._sparse_array is not None: self._sparse_array.default_value = self._default_value else: self._sparse_array = self._sparse_array @property def is_sparse(self) -> bool: """ Get whether the profile uses sparse storage. :return: Sparse state. """ return self._is_sparse @property def is_initialized(self) -> bool: """ Get whether the profile has been initialized. :return: Initialization state. """ return self._initialized
[docs] def set_initialized(self) -> None: """ Mark the profile as initialized. :return: ``None``. """ self._initialized = True
@property def sparse_array(self) -> SparseArrayEnum | None: """ Get the sparse backing array. :return: Sparse array or ``None``. """ return self._sparse_array @property def dense_array(self) -> np.ndarray | None: """ Get the dense backing array. :return: Dense array or ``None``. """ return self._dense_array
[docs] def create_sparse(self, size: int, default_value: Enum | None, map_data: dict[int, Enum | None] | None = None) -> None: """ Create the profile in sparse form. :param size: Logical size. :param default_value: Sparse default value. :param map_data: Optional sparse map. :return: ``None``. """ self._is_sparse = True self._default_value = self._coerce_value(default_value) self._sparse_array = SparseArrayEnum(default_value=self._default_value, enum_type=self._dtype) if map_data is None: self._sparse_array.create(size=size, default_value=self._default_value) else: self._sparse_array.create_from_dict(default_value=self._default_value, size=size, map_data=map_data) self._dense_array = None self._initialized = True
[docs] def create_dense(self, size: int, default_value: Enum | None) -> None: """ Create the profile in dense form. :param size: Logical size. :param default_value: Dense fill value. :return: ``None``. """ self._is_sparse = False self._default_value = self._coerce_value(default_value) self._dense_array = np.full(size, self._default_value, dtype=object) self._sparse_array = None self._initialized = True
@property def sparsity(self) -> float: """ Get the sparse stored-entry ratio. :return: Sparsity ratio. """ if self._is_sparse and self._sparse_array is not None: return self._sparse_array.get_sparsity() else: return 0.0
[docs] def set(self, arr: np.ndarray) -> bool: """ Set the profile from a dense array. :param arr: Dense enum array. :return: ``True`` when the assignment succeeds. """ if not isinstance(arr, np.ndarray): print("You can only set numpy arrays") return False else: if arr.ndim != 1: print("You can only set 1D numpy arrays") return False else: if len(arr) == 0: return False else: arr_values: list[Enum | None] = list() # Coerce every element explicitly so the profile never keeps mixed enum payloads. for raw_value in arr: arr_values.append(self._coerce_value(raw_value)) arr_mod: np.ndarray = np.array(arr_values, dtype=object) if self.size() > 0: if len(arr_mod) != self.size(): raise ValueError("The array must have the same size as the profile") else: arr_mod = arr_mod else: arr_mod = arr_mod counts: Counter[Enum | None] = Counter(arr_mod.tolist()) most_common_element: Enum | None most_common_count: int most_common_element, most_common_count = counts.most_common(1)[0] sparsity_factor: float = most_common_count / len(arr_mod) if sparsity_factor >= self._sparsity_threshold: base_value: Enum | None = self._coerce_value(most_common_element) data_map: dict[int, Enum | None] = dict() for idx, raw_value in enumerate(arr_mod): coerced_value: Enum | None = self._coerce_value(raw_value) if coerced_value != base_value: data_map[idx] = coerced_value else: data_map = data_map self._is_sparse = True self._default_value = base_value self._sparse_array = SparseArrayEnum(default_value=base_value, enum_type=self._dtype) self._sparse_array.create(size=len(arr_mod), default_value=base_value, data=data_map) self._dense_array = None else: self._is_sparse = False self._dense_array = arr_mod self._sparse_array = None self._default_value = self._coerce_value(arr_mod[0]) self._initialized = True return True
def __eq__(self, other: object) -> bool: """ Compare two profiles. :param other: Object to compare against. :return: ``True`` when both profiles match. """ if isinstance(other, ProfileEnum): if self._is_sparse == other._is_sparse: if self._is_sparse: if self._sparse_array is None and other._sparse_array is None: return self.default_value == other.default_value else: if self._sparse_array is not None and other._sparse_array is not None: return self._sparse_array == other._sparse_array else: return False else: if self._dense_array is None and other._dense_array is None: return self.default_value == other.default_value else: if self._dense_array is not None and other._dense_array is not None: return np.array_equal(self._dense_array, other._dense_array) else: return False else: return False else: return False def __ne__(self, other: object) -> bool: """ Compare two profiles for inequality. :param other: Object to compare against. :return: ``True`` when both profiles differ. """ return not self.__eq__(other) def __getitem__(self, key: int) -> Enum | None: """ Get a value by index. :param key: Position to read. :return: Value at ``key``. """ if self._is_sparse: if self._sparse_array is not None: return self._sparse_array[key] else: return self.default_value else: if self._dense_array is None: self._is_sparse = True self._sparse_array = SparseArrayEnum(default_value=self.default_value, enum_type=self._dtype) print("Initializing sparse when querying, this signals a mis initialization") return self.default_value else: return self._coerce_value(self._dense_array[key]) def __setitem__(self, key: int, value: Enum | None) -> None: """ Set a value by index. :param key: Position to write. :param value: Value to store. :return: ``None``. """ if isinstance(key, int): coerced_value: Enum | None = self._coerce_value(value) if self._is_sparse: if self._sparse_array is not None: if key < self._sparse_array.size(): self._sparse_array[key] = coerced_value else: raise AssertionError("Key out of bounds") else: raise AssertionError("Sparse profile not initialized") else: if self._dense_array is not None: if key < len(self._dense_array): self._dense_array[key] = coerced_value else: raise AssertionError("Key out of bounds") else: raise AssertionError("Dense profile not initialized") else: raise TypeError("Key must be an integer") def __imul__(self, other: float | int) -> "ProfileEnum": """ Reject in-place scaling because enum profiles are not numeric. :param other: Scalar multiplier. :return: ``self``. """ raise TypeError(f"Unsupported type {type(other)}")
[docs] def convert_sparse_to_dense(self) -> None: """ Convert the sparse profile into dense storage. :return: ``None``. """ if self._is_sparse and self._sparse_array is not None: self._dense_array = self._sparse_array.toarray() self._sparse_array = None self._is_sparse = False else: self._is_sparse = self._is_sparse
[docs] def resize(self, n: int) -> None: """ Resize the profile. :param n: New logical size. :return: ``None``. """ if isinstance(n, int): if self._initialized: if self._is_sparse: if self._sparse_array is not None: self._sparse_array.resize(n=n) else: raise AssertionError("Sparse profile not initialized") else: if self._dense_array is not None: new_array: np.ndarray = np.full(n, self.default_value, dtype=object) copy_size: int = min(len(self._dense_array), n) new_array[:copy_size] = self._dense_array[:copy_size] self._dense_array = new_array else: raise AssertionError("Dense profile not initialized") else: self._initialized = True self.create_sparse(size=n, default_value=self.default_value) else: raise TypeError("The size must be an integer")
[docs] def resample(self, indices: IntVec) -> None: """ Resample the profile in place. :param indices: New index selection. :return: ``None``. """ if self._is_sparse: if self._sparse_array is not None: self._sparse_array.resample(indices=indices) else: raise AssertionError("Sparse profile not initialized") else: if self._dense_array is not None: self._dense_array = self._dense_array[indices] else: raise AssertionError("Dense profile not initialized")
[docs] def fill(self, value: Enum | None) -> None: """ Fill the profile with a single value. :param value: Fill value. :return: ``None``. """ coerced_value: Enum | None = self._coerce_value(value) self.default_value = coerced_value self._is_sparse = True if self._sparse_array is None: self._sparse_array = SparseArrayEnum(default_value=coerced_value, enum_type=self._dtype) else: self._sparse_array = self._sparse_array self._sparse_array.fill(coerced_value) self._dense_array = None self._initialized = True
[docs] def scale(self, value: float | int) -> None: """ Reject scaling because enum profiles are not numeric. :param value: Scalar multiplier. :return: ``None``. """ raise TypeError("This profile type does not support scaling")
[docs] def size(self) -> int: """ Get the profile size. :return: Logical size. """ if self._initialized: if self._is_sparse: if self._sparse_array is not None: return self._sparse_array.size() else: return 0 else: if self._dense_array is not None: return len(self._dense_array) else: return 0 else: return 0
[docs] def toarray(self) -> np.ndarray: """ Get the dense numpy representation. :return: Dense object array. """ if self.size() > 0: if self._is_sparse: if self._sparse_array is not None: return self._sparse_array.toarray() else: return np.zeros(0, dtype=object) else: if self._dense_array is not None: return self._dense_array else: return np.zeros(0, dtype=object) else: return np.zeros(0, dtype=object)
[docs] def tolist(self) -> list[Enum | None]: """ Get the dense list representation. :return: Dense list. """ return self.toarray().tolist()
[docs] def astype(self, tpe: Any) -> np.ndarray: """ Cast the dense representation to another dtype. :param tpe: Target dtype. :return: Cast dense array. """ return self.toarray().astype(tpe)
[docs] def get_sparse_representation(self) -> tuple[list[int], list[Enum | None]]: """ Export the sparse contents as parallel index and value lists. :return: Tuple ``(indices, values)``. """ if self._sparse_array is not None: return self._sparse_array.get_sparse_representation() else: return list(), list()
[docs] def set_sparse_data_from_data(self, indptr: list[int], data: list[Enum | None]) -> None: """ Load sparse data from parallel lists. :param indptr: Sparse indices. :param data: Sparse values. :return: ``None``. """ if self._sparse_array is not None: self._sparse_array.set_sparse_data_from_data(indptr=indptr, data=data) else: raise AssertionError("Sparse profile not initialized")
[docs] def fix_nan(self, default_value: float = 0.0) -> None: """ Keep the enum profile API aligned with the generic profile. :param default_value: Unused placeholder. :return: ``None``. """ self._default_value = self._default_value
[docs] def copy(self) -> "ProfileEnum": """ Build a deep copy of the profile. :return: New profile copy. """ copied_profile: ProfileEnum = ProfileEnum( default_value=self.default_value, enum_type=self._dtype, arr=None, sparsity_threshold=self._sparsity_threshold, is_sparse=self.is_sparse, ) if self._sparse_array is not None: copied_profile._sparse_array = self._sparse_array.copy() else: copied_profile._sparse_array = None if self._dense_array is not None: copied_profile._dense_array = self._dense_array.copy() else: copied_profile._dense_array = None copied_profile._default_value = self.default_value copied_profile._initialized = self.is_initialized return copied_profile