from __future__ import annotations
from collections import Counter
from typing import Any
import numpy as np
from VeraGridEngine.basic_structures import IntVec
from VeraGridEngine.Devices.Profiles.sparse_array_int import SparseArrayInt
[docs]
class ProfileInt:
"""
Profile specialized for ``int`` values.
"""
__slots__ = (
"_is_sparse",
"_sparse_array",
"_dense_array",
"_sparsity_threshold",
"_dtype",
"_initialized",
"_default_value",
)
def __init__(
self,
default_value: int,
arr: np.ndarray | None = None,
sparsity_threshold: float = 0.8,
is_sparse: bool = False,
) -> None:
"""
Build an integer profile.
:param default_value: Declared default value.
:param arr: Optional dense source array.
:param sparsity_threshold: Sparse storage threshold.
:param is_sparse: Requested initial sparse state.
"""
self._is_sparse: bool = is_sparse
self._sparse_array: SparseArrayInt | None = None
self._dense_array: np.ndarray | None = None
self._sparsity_threshold: float = sparsity_threshold
self._dtype: type[int] = int
self._initialized: bool = False
self._default_value: int = int(default_value)
if arr is not None:
self.set(arr=arr)
else:
self._initialized = False
[docs]
def clear(self) -> None:
"""
Clear the profile contents.
:return: ``None``.
"""
self._is_sparse = False
self._sparse_array = None
self._dense_array = None
self._initialized = False
[docs]
def info(self) -> dict[str, Any]:
"""
Get diagnostic information.
:return: Information dictionary.
"""
return {
"me": hex(id(self)),
"initialized": self._initialized,
"size": self.size(),
"is_sparse": self._is_sparse,
"sparsity_threshold": self._sparsity_threshold,
"default_value": self.default_value,
"dense_array": {
"me": hex(id(self._dense_array)),
"size": len(self._dense_array),
} if self._dense_array is not None else "None",
"sparse_array": self._sparse_array.info() if self._sparse_array is not None else "None",
}
[docs]
def get_sparse_map(self) -> dict[int, int]:
"""
Get the sparse map when the profile is sparse.
:return: Sparse map or an empty dictionary.
"""
if self._sparse_array is not None:
return self._sparse_array.get_map()
else:
return dict()
@property
def dtype(self) -> type[int]:
"""
Get the declared type.
:return: ``int``.
"""
return self._dtype
@property
def default_value(self) -> int:
"""
Get the profile default value.
:return: Default value.
"""
if self._sparse_array is not None:
return self._sparse_array.default_value
else:
return self._default_value
@default_value.setter
def default_value(self, val: int) -> None:
"""
Set the profile default value.
:param val: New default value.
:return: ``None``.
"""
self._default_value = int(val)
if self._sparse_array is not None:
self._sparse_array.default_value = self._default_value
else:
self._sparse_array = self._sparse_array
@property
def is_sparse(self) -> bool:
"""
Get whether the profile uses sparse storage.
:return: Sparse state.
"""
return self._is_sparse
@property
def is_initialized(self) -> bool:
"""
Get whether the profile has been initialized.
:return: Initialization state.
"""
return self._initialized
[docs]
def set_initialized(self) -> None:
"""
Mark the profile as initialized.
:return: ``None``.
"""
self._initialized = True
@property
def sparse_array(self) -> SparseArrayInt | None:
"""
Get the sparse backing array.
:return: Sparse array or ``None``.
"""
return self._sparse_array
@property
def dense_array(self) -> np.ndarray | None:
"""
Get the dense backing array.
:return: Dense array or ``None``.
"""
return self._dense_array
[docs]
def create_sparse(self, size: int, default_value: int, map_data: dict[int, int] | None = None) -> None:
"""
Create the profile in sparse form.
:param size: Logical size.
:param default_value: Sparse default value.
:param map_data: Optional sparse map.
:return: ``None``.
"""
self._is_sparse = True
self._default_value = int(default_value)
self._sparse_array = SparseArrayInt(default_value=self._default_value)
if map_data is None:
self._sparse_array.create(size=size, default_value=self._default_value)
else:
self._sparse_array.create_from_dict(default_value=self._default_value, size=size, map_data=map_data)
self._dense_array = None
self._initialized = True
[docs]
def create_dense(self, size: int, default_value: int) -> None:
"""
Create the profile in dense form.
:param size: Logical size.
:param default_value: Dense fill value.
:return: ``None``.
"""
self._is_sparse = False
self._default_value = int(default_value)
self._dense_array = np.full(size, self._default_value, dtype=int)
self._sparse_array = None
self._initialized = True
@property
def sparsity(self) -> float:
"""
Get the sparse stored-entry ratio.
:return: Sparsity ratio.
"""
if self._is_sparse and self._sparse_array is not None:
return self._sparse_array.get_sparsity()
else:
return 0.0
[docs]
def set(self, arr: np.ndarray) -> bool:
"""
Set the profile from a dense array.
:param arr: Dense integer array.
:return: ``True`` when the assignment succeeds.
"""
if not isinstance(arr, np.ndarray):
print("You can only set numpy arrays")
return False
else:
if arr.ndim != 1:
print("You can only set 1D numpy arrays")
return False
else:
if len(arr) == 0:
return False
else:
try:
arr_mod: np.ndarray = np.asarray(arr, dtype=int)
except (TypeError, ValueError):
print("Cannot set dense array because the type cast failed")
return False
if self.size() > 0:
if len(arr_mod) != self.size():
raise ValueError("The array must have the same size as the profile")
else:
arr_mod = arr_mod
else:
arr_mod = arr_mod
counts: Counter[int] = Counter(arr_mod.tolist())
most_common_element: int
most_common_count: int
most_common_element, most_common_count = counts.most_common(1)[0]
sparsity_factor: float = most_common_count / len(arr_mod)
if sparsity_factor >= self._sparsity_threshold:
base_value: int = int(most_common_element)
data_map: dict[int, int] = dict()
non_default_indices: np.ndarray = np.flatnonzero(arr_mod != base_value)
# The numeric path keeps sparse compression vectorized on large arrays.
for raw_idx in non_default_indices:
idx: int = int(raw_idx)
data_map[idx] = int(arr_mod[idx])
self._is_sparse = True
self._default_value = base_value
self._sparse_array = SparseArrayInt(default_value=base_value)
self._sparse_array.create(size=len(arr_mod), default_value=base_value, data=data_map)
self._dense_array = None
else:
self._is_sparse = False
self._dense_array = arr_mod
self._sparse_array = None
self._default_value = int(arr_mod[0])
self._initialized = True
return True
def __eq__(self, other: object) -> bool:
"""
Compare two profiles.
:param other: Object to compare against.
:return: ``True`` when both profiles match.
"""
if isinstance(other, ProfileInt):
if self._is_sparse == other._is_sparse:
if self._is_sparse:
if self._sparse_array is None and other._sparse_array is None:
return self.default_value == other.default_value
else:
if self._sparse_array is not None and other._sparse_array is not None:
return self._sparse_array == other._sparse_array
else:
return False
else:
if self._dense_array is None and other._dense_array is None:
return self.default_value == other.default_value
else:
if self._dense_array is not None and other._dense_array is not None:
return np.array_equal(self._dense_array, other._dense_array)
else:
return False
else:
return False
else:
return False
def __ne__(self, other: object) -> bool:
"""
Compare two profiles for inequality.
:param other: Object to compare against.
:return: ``True`` when both profiles differ.
"""
return not self.__eq__(other)
def __getitem__(self, key: int) -> int:
"""
Get a value by index.
:param key: Position to read.
:return: Value at ``key``.
"""
if self._is_sparse:
if self._sparse_array is not None:
return self._sparse_array[key]
else:
return self.default_value
else:
if self._dense_array is None:
self._is_sparse = True
self._sparse_array = SparseArrayInt(default_value=self.default_value)
print("Initializing sparse when querying, this signals a mis initialization")
return self.default_value
else:
return int(self._dense_array[key])
def __setitem__(self, key: int, value: int) -> None:
"""
Set a value by index.
:param key: Position to write.
:param value: Value to store.
:return: ``None``.
"""
if isinstance(key, int):
coerced_value: int = int(value)
if self._is_sparse:
if self._sparse_array is not None:
if key < self._sparse_array.size():
self._sparse_array[key] = coerced_value
else:
raise AssertionError("Key out of bounds")
else:
raise AssertionError("Sparse profile not initialized")
else:
if self._dense_array is not None:
if key < len(self._dense_array):
self._dense_array[key] = coerced_value
else:
raise AssertionError("Key out of bounds")
else:
raise AssertionError("Dense profile not initialized")
else:
raise TypeError("Key must be an integer")
def __imul__(self, other: float | int) -> "ProfileInt":
"""
Multiply the profile in place by a scalar.
:param other: Scalar multiplier.
:return: ``self``.
"""
if isinstance(other, (int, float)):
self.scale(value=other)
return self
else:
raise TypeError(f"Unsupported type {type(other)}")
[docs]
def convert_sparse_to_dense(self) -> None:
"""
Convert the sparse profile into dense storage.
:return: ``None``.
"""
if self._is_sparse and self._sparse_array is not None:
self._dense_array = self._sparse_array.toarray()
self._sparse_array = None
self._is_sparse = False
else:
self._is_sparse = self._is_sparse
[docs]
def resize(self, n: int) -> None:
"""
Resize the profile.
:param n: New logical size.
:return: ``None``.
"""
if isinstance(n, int):
if self._initialized:
if self._is_sparse:
if self._sparse_array is not None:
self._sparse_array.resize(n=n)
else:
raise AssertionError("Sparse profile not initialized")
else:
if self._dense_array is not None:
new_array: np.ndarray = np.full(n, self.default_value, dtype=int)
copy_size: int = min(len(self._dense_array), n)
new_array[:copy_size] = self._dense_array[:copy_size]
self._dense_array = new_array
else:
raise AssertionError("Dense profile not initialized")
else:
self._initialized = True
self.create_sparse(size=n, default_value=self.default_value)
else:
raise TypeError("The size must be an integer")
[docs]
def resample(self, indices: IntVec) -> None:
"""
Resample the profile in place.
:param indices: New index selection.
:return: ``None``.
"""
if self._is_sparse:
if self._sparse_array is not None:
self._sparse_array.resample(indices=indices)
else:
raise AssertionError("Sparse profile not initialized")
else:
if self._dense_array is not None:
self._dense_array = self._dense_array[indices]
else:
raise AssertionError("Dense profile not initialized")
[docs]
def fill(self, value: int) -> None:
"""
Fill the profile with a single value.
:param value: Fill value.
:return: ``None``.
"""
coerced_value: int = int(value)
self.default_value = coerced_value
self._is_sparse = True
if self._sparse_array is None:
self._sparse_array = SparseArrayInt(default_value=coerced_value)
else:
self._sparse_array = self._sparse_array
self._sparse_array.fill(coerced_value)
self._dense_array = None
self._initialized = True
[docs]
def scale(self, value: float | int) -> None:
"""
Scale the profile by a scalar.
:param value: Scalar multiplier.
:return: ``None``.
"""
if self._is_sparse:
if self._sparse_array is not None:
current_size: int = self._sparse_array.size()
new_default_value: int = int(self.default_value * value)
new_map: dict[int, int] = dict()
# Scale only the explicit sparse entries and rebuild the sparse structure against the scaled default.
for key, sparse_value in self._sparse_array.get_map().items():
scaled_value: int = int(sparse_value * value)
if scaled_value != new_default_value:
new_map[key] = scaled_value
else:
new_map = new_map
self._sparse_array = SparseArrayInt(default_value=new_default_value)
self._sparse_array.create(size=current_size, default_value=new_default_value, data=new_map)
self._default_value = new_default_value
else:
raise AssertionError("Sparse profile not initialized")
else:
if self._dense_array is not None:
self._dense_array = (self._dense_array * value).astype(int)
self._default_value = int(self.default_value * value)
else:
raise AssertionError("Dense profile not initialized")
[docs]
def size(self) -> int:
"""
Get the profile size.
:return: Logical size.
"""
if self._initialized:
if self._is_sparse:
if self._sparse_array is not None:
return self._sparse_array.size()
else:
return 0
else:
if self._dense_array is not None:
return len(self._dense_array)
else:
return 0
else:
return 0
[docs]
def toarray(self) -> np.ndarray:
"""
Get the dense numpy representation.
:return: Dense integer array.
"""
if self.size() > 0:
if self._is_sparse:
if self._sparse_array is not None:
return self._sparse_array.toarray()
else:
return np.zeros(0, dtype=int)
else:
if self._dense_array is not None:
return self._dense_array
else:
return np.zeros(0, dtype=int)
else:
return np.zeros(0, dtype=int)
[docs]
def tolist(self) -> list[int]:
"""
Get the dense list representation.
:return: Dense list.
"""
return self.toarray().tolist()
[docs]
def astype(self, tpe: Any) -> np.ndarray:
"""
Cast the dense representation to another dtype.
:param tpe: Target dtype.
:return: Cast dense array.
"""
return self.toarray().astype(tpe)
[docs]
def get_sparse_representation(self) -> tuple[list[int], list[int]]:
"""
Export the sparse contents as parallel index and value lists.
:return: Tuple ``(indices, values)``.
"""
if self._sparse_array is not None:
return self._sparse_array.get_sparse_representation()
else:
return list(), list()
[docs]
def set_sparse_data_from_data(self, indptr: list[int], data: list[int]) -> None:
"""
Load sparse data from parallel lists.
:param indptr: Sparse indices.
:param data: Sparse values.
:return: ``None``.
"""
if self._sparse_array is not None:
self._sparse_array.set_sparse_data_from_data(indptr=indptr, data=data)
else:
raise AssertionError("Sparse profile not initialized")
[docs]
def fix_nan(self, default_value: float = 0.0) -> None:
"""
Keep the integer profile API aligned with the generic profile.
:param default_value: Unused placeholder.
:return: ``None``.
"""
self._default_value = self._default_value
[docs]
def copy(self) -> "ProfileInt":
"""
Build a deep copy of the profile.
:return: New profile copy.
"""
copied_profile: ProfileInt = ProfileInt(
default_value=self.default_value,
arr=None,
sparsity_threshold=self._sparsity_threshold,
is_sparse=self.is_sparse,
)
if self._sparse_array is not None:
copied_profile._sparse_array = self._sparse_array.copy()
else:
copied_profile._sparse_array = None
if self._dense_array is not None:
copied_profile._dense_array = self._dense_array.copy()
else:
copied_profile._dense_array = None
copied_profile._default_value = self.default_value
copied_profile._initialized = self.is_initialized
return copied_profile