# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
from typing import List, Any, Dict, Union, Tuple
import pandas as pd
import numpy as np
import numpy.typing as npt
import datetime
from scipy.sparse import csc_matrix, csr_matrix
from VeraGridEngine.enumerations import TimeGrouping, LogSeverity
import copy
IntList = List[int]
Numeric = Union[int, float, bool, complex]
NumericVec = npt.NDArray[np.float64]
DateVec = pd.DatetimeIndex
IntVec = npt.NDArray[np.int_]
BoolVec = npt.NDArray[np.bool_]
Vec = npt.NDArray[np.float64]
CxVec = npt.NDArray[np.complex128]
StrVec = npt.NDArray[np.str_]
ObjVec = npt.NDArray[np.object_]
Mat = Union[npt.NDArray[np.float64], np.ndarray[tuple[int, int], np.dtype[np.float64]]] # no way yet to say it is 2D
CxMat = npt.NDArray[np.complex128] # no way yet to say it is 2D
IntMat = npt.NDArray[np.int_] # no way yet to say it is 2D
StrMat = npt.NDArray[np.str_] # no way yet to say it is 2D
ObjMat = npt.NDArray[np.object_] # no way yet to say it is 2D
CscMat = csc_matrix
CsrMat = csr_matrix
[docs]
class CDF:
"""
Inverse Cumulative density function of a given array of data
"""
def __init__(self, data: Vec | pd.Series):
"""
Constructor
@param data: Array (DataFrame or numpy array)
"""
# Create the CDF of the data
# sort the data:
if isinstance(data, pd.Series):
self.arr = np.sort(np.ndarray.flatten(data.values))
else:
self.arr = np.sort(data, axis=0)
self.iscomplex = np.iscomplexobj(self.arr)
# calculate the proportional values of samples
n = len(data)
if n > 1:
self.prob = np.arange(n, dtype=float) / (n - 1)
else:
self.prob = np.arange(n, dtype=float)
# iterator index
self.idx = 0
# array length
self.len = len(self.arr)
def __call__(self):
"""
Call this as CDF()
@return:
"""
return self.arr
def __iter__(self):
"""
Iterator constructor
@return:
"""
self.idx = 0
return self
def __next__(self):
"""
Iterator next element
@return:
"""
if self.idx == self.len:
raise StopIteration
self.idx += 1
return self.arr[self.idx - 1]
def __add__(self, other):
"""
Sum of two CDF
@param other:
@return: A CDF object with the sum of other CDF to this CDF
"""
return CDF(np.array([a + b for a in self.arr for b in other]))
def __sub__(self, other):
"""
Subtract of two CDF
@param other:
@return: A CDF object with the subtraction a a CDF to this CDF
"""
return CDF(np.array([a - b for a in self.arr for b in other]))
[docs]
def get_sample(self, npoints=1):
"""
Samples a number of uniform distributed points and
returns the corresponding probability values given the CDF.
@param npoints: Number of points to sample, 1 by default
@return: Corresponding probabilities
"""
pt = np.random.uniform(0, 1, npoints)
if self.iscomplex:
a = np.interp(pt, self.prob, self.arr.real)
b = np.interp(pt, self.prob, self.arr.imag)
return a + 1j * b
else:
return np.interp(pt, self.prob, self.arr)
[docs]
def get_at(self, prob: float):
"""
Samples a number of uniform distributed points and
returns the corresponding probability values given the CDF.
@param prob: probability from 0 to 1
@return: Corresponding CDF value
"""
if self.iscomplex:
a = np.interp(prob, self.prob, self.arr.real)
b = np.interp(prob, self.prob, self.arr.imag)
return a + 1j * b
else:
return np.interp(prob, self.prob, self.arr)
[docs]
def expectation(self) -> float | complex:
"""
Returns the CDF expected value (AKA the mean)
:return: expectation
"""
n = len(self.arr)
return np.sum(self.arr) * (1 / n)
[docs]
def plot(self, plt, LINEWIDTH: int, ax=None):
"""
Plots the CFD
:param plt: MatPlotLib plt module
:param LINEWIDTH: line width in pixels
:param ax: MatPlotLib axis to plot into
:return:
"""
if ax is None:
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(self.prob, self.arr, linewidth=LINEWIDTH)
ax.set_xlabel('$p(x)$')
ax.set_ylabel('$x$')
[docs]
def classify_by_hour(t: pd.DatetimeIndex) -> List[List[int]]:
"""
Passes an array of TimeStamps to an array of arrays of indices
classified by hour of the year
@param t: Pandas time Index array
@return: list of lists of integer indices
"""
n = len(t)
offset = t[0].hour * t[0].dayofyear
mx = t[n - 1].hour * t[n - 1].dayofyear
arr: List[List[int]] = list()
for i in range(mx - offset + 1):
arr.append(list())
for i in range(n):
hourofyear = t[i].hour * t[i].dayofyear
arr[hourofyear - offset].append(i)
return arr
[docs]
def classify_by_day(t: pd.DatetimeIndex) -> list[list[int]]:
"""
Passes an array of TimeStamps to an array of arrays of indices
classified by day of the year
@param t: Pandas time Index array
@return: list of lists of integer indices
"""
n = len(t)
offset = t[0].dayofyear
mx = t[n - 1].dayofyear
arr: list[list[int]] = list()
for i in range(mx - offset + 1):
arr.append(list())
for i in range(n):
hourofyear = t[i].dayofyear
arr[hourofyear - offset].append(i)
return arr
[docs]
def get_time_groups(t_array: pd.DatetimeIndex, grouping: TimeGrouping) -> List[int]:
"""
Get the indices delimiting a number of groups
:param t_array: DatetimeIndex object containing dates
:param grouping: TimeGrouping value
:return: list of indices that determine the partitions
"""
groups: List[int] = list()
nt = len(t_array)
last = -1
i = 0
for i in range(nt):
t = t_array[i]
if grouping == TimeGrouping.Monthly:
if t.month != last:
last = t.month
groups.append(i)
elif grouping == TimeGrouping.Weekly:
if t.week != last:
last = t.week
groups.append(i)
elif grouping == TimeGrouping.Daily:
if t.day != last:
last = t.day
groups.append(i)
elif grouping == TimeGrouping.Hourly:
if t.hour != last:
last = t.hour
groups.append(i)
# add the last index if it is not already there
if nt > 0:
if i != groups[len(groups) - 1]:
groups.append(i)
return groups
[docs]
class LogEntry:
"""
Logger entry
"""
def __init__(self,
time: Union[str, None] = None,
msg="",
severity: LogSeverity = LogSeverity.Information,
device="",
value="",
expected_value="",
device_class="",
device_property="",
object_value=None,
expected_object_value=None):
if time is None:
self.time = "{date:%H:%M:%S}".format(date=datetime.datetime.now()) # might use %Y/%m/%d %H:%M:%S
else:
self.time = time
self.msg = str(msg)
self.severity = severity
self.device = device
self.device_class = device_class
self.device_property = device_property
self.value = value
self.expected_value = str(expected_value)
self.object_value = object_value
self.expected_object_value = expected_object_value
[docs]
def to_list(self) -> List[Any]:
"""
Get list representation of this entry
:return:
"""
return [self.time, self.severity.value, self.msg,
self.device_class, self.device_property, self.device,
self.value, self.expected_value]
[docs]
def to_list_reduced(self) -> List[Any]:
"""
Get list representation of this entry
:return:
"""
return [self.time, self.device_class, self.device_property, self.device, self.value, self.expected_value]
def __str__(self):
return "{0} {1}: {2} {3} {4} {5}".format(self.time,
self.severity.value,
self.msg,
self.device,
self.value,
self.expected_value)
def __repr__(self):
return self.msg
[docs]
class Logger:
"""
Logger class
"""
def __init__(self) -> None:
self.entries: List[LogEntry] = list()
self.debug_entries: List[str] = list()
[docs]
def add_debug(self, *args):
"""
Add debug entry
:param args:
:return:
"""
self.debug_entries.append(" ".join([str(x) for x in args]))
[docs]
def append(self, txt: str):
"""
simple text log
:param txt: some message text
"""
self.entries.append(LogEntry(txt))
[docs]
def has_logs(self) -> bool:
"""
Are there any logs?
:return: True / False
"""
return len(self.entries) > 0
[docs]
def add_info(self, msg: str, device="", value="", expected_value="", device_class='', comment='',
device_property='', object_value=None, expected_object_value=None):
"""
Add info entry
:param msg:
:param device:
:param value:
:param expected_value:
:param device_class:
:param comment:
:param device_property:
:param object_value:
:param expected_object_value:
:return:
"""
self.entries.append(LogEntry(msg=str(msg),
severity=LogSeverity.Information,
device=str(device),
value=str(value),
expected_value=str(expected_value),
device_class=str(device_class),
device_property=str(device_property),
object_value=str(object_value),
expected_object_value=str(expected_object_value)))
[docs]
def add_warning(self, msg: str, device="", value="", expected_value="", device_class='', comment='',
device_property='', object_value=None, expected_object_value=None):
"""
Add warning entry
:param msg:
:param device:
:param value:
:param expected_value:
:param device_class:
:param comment:
:param device_property:
:param object_value:
:param expected_object_value:
:return:
"""
self.entries.append(LogEntry(msg=str(msg),
severity=LogSeverity.Warning,
device=str(device),
value=str(value),
expected_value=str(expected_value),
device_class=str(device_class),
device_property=str(device_property),
object_value=str(object_value),
expected_object_value=str(expected_object_value)))
[docs]
def add_error(self, msg: str, device="", value="", expected_value="", device_class='', comment='',
device_property='', object_value=None, expected_object_value=None):
"""
Add error entry
:param msg:
:param device:
:param value:
:param expected_value:
:param device_class:
:param comment:
:param device_property:
:param object_value:
:param expected_object_value:
:return:
"""
self.entries.append(LogEntry(msg=str(msg),
severity=LogSeverity.Error,
device=str(device),
value=str(value),
expected_value=str(expected_value),
device_class=str(device_class),
device_property=str(device_property),
object_value=str(object_value),
expected_object_value=str(expected_object_value)))
[docs]
def add_divergence(self, msg, device="", value=0, expected_value=0, tol=1e-6):
"""
Add divergence entry
:param msg:
:param device:
:param value:
:param expected_value:
:param tol:
:return:
"""
if abs(value - expected_value) > tol:
self.entries.append(LogEntry(msg=str(msg),
severity=LogSeverity.Divergence,
device=str(device),
value=str(value),
expected_value=str(expected_value),
device_class="",
device_property="",
object_value=None,
expected_object_value=None))
[docs]
def add(self, msg: str, severity: LogSeverity = LogSeverity.Error, device="", value="", expected_value="",
device_class='', comment='', device_property='', object_value=None, expected_object_value=None):
"""
Add general entry
:param msg:
:param severity:
:param device:
:param value:
:param expected_value:
:param device_class:
:param comment:
:param device_property:
:param object_value:
:param expected_object_value:
:return:
"""
# self.entries.append(LogEntry(msg, severity, device, str(value), str(expected_value)))
self.entries.append(LogEntry(msg=str(msg),
severity=severity,
device=str(device),
value=str(value),
expected_value=str(expected_value),
device_class=str(device_class),
device_property=str(device_property),
object_value=str(object_value),
expected_object_value=str(expected_object_value)))
[docs]
def to_dict(self) -> Union[
Dict[str, Dict[str, List[Tuple[str, str, str, str]]]], Dict[str, Dict[str, List[List[str]]]]]:
"""
Get the logs sorted by severity and message
:return: Dictionary[Dictionary[List[time, device, value, expected value]]]
"""
by_severity = dict()
hdr = ['Time', 'Class', 'Property', 'Device', 'Value', 'Expected value']
for e in self.entries:
if e.severity.value not in by_severity.keys():
by_severity[e.severity.value] = dict()
by_msg = by_severity[e.severity.value]
if e.msg in by_msg.keys():
# add msg to existing msg list
by_msg[e.msg].append(e.to_list_reduced())
else:
# add msg entry for the first time
by_msg[e.msg] = [e.to_list_reduced()]
return by_severity
[docs]
def to_df(self) -> pd.DataFrame:
"""
Get DataFrame
:return: DataFrame
"""
data = [e.to_list() for e in self.entries]
df = pd.DataFrame(data=data, columns=['Time', 'Severity', 'Message', 'Class',
'Property', 'Device', 'Value', 'Expected value'])
df.set_index('Time', inplace=True)
return df
[docs]
def parse_df(self, df: pd.DataFrame):
"""
Parse DataFrame
:param df: DataFrame
"""
for i, row in df.iterrows():
self.entries.append(LogEntry(msg=str(row["Message"]),
severity=LogSeverity(row["Severity"]),
device=str(row["Device"]),
value=str(row["Value"]),
expected_value=str(row["Expected value"]),
device_class=str(row["Class"]),
device_property=str(row["Property"]),
object_value="",
expected_object_value=""))
[docs]
def to_csv(self, fname):
"""
Save to CSV
:param fname: file name
"""
self.to_df().to_csv(fname)
[docs]
def to_xlsx(self, fname):
"""
To Excel
:param fname: file name
"""
self.to_df().to_excel(fname)
[docs]
def print(self, title: str = "") -> None:
"""
Print the logs
"""
if title != "":
print(title)
print(self.to_df())
def __str__(self) -> str:
val = ''
for e in self.entries:
val += str(e) + '\n'
return val
def __getitem__(self, key):
"""
get [index] implementation
:param key: integer
:return: message, severity
"""
return self.entries[key]
def __setitem__(self, idx, value):
"""
set [index] implementation
:param idx: integer
:param value: string message
:return: Nothing
"""
self.entries[idx] = value
def __iadd__(self, other: "Logger"):
"""
+= implementation
:param other:
:return:
"""
if other is not None:
self.entries += other.entries
return self
def __len__(self) -> int:
return len(self.entries)
[docs]
def size(self) -> int:
"""
Number of logs
:return: size
"""
return len(self.entries)
[docs]
def count_type(self, severity: LogSeverity) -> int:
"""
Count the number of entries of a certain severity
:param severity: LogSeverity
:return: number of occurences
"""
c = 0
for entry in self.entries:
if entry.severity == severity:
c += 1
return c
[docs]
def info_count(self) -> int:
"""
Count the number of information occurences
:return:
"""
return self.count_type(LogSeverity.Information)
[docs]
def warning_count(self) -> int:
"""
Count number of warnings
:return:
"""
return self.count_type(LogSeverity.Warning)
[docs]
def error_count(self) -> int:
"""
Count number of errors
:return:
"""
return self.count_type(LogSeverity.Error)
[docs]
def has_errors(self) -> bool:
"""
Check if there are errors
:return:
"""
return self.error_count() > 0
[docs]
class ConvergenceReport:
"""
Convergence report
"""
def __init__(self) -> None:
"""
Constructor
"""
self.methods_ = list()
self.converged_ = list()
self.error_ = list()
self.elapsed_ = list()
self.iterations_ = list()
[docs]
def add(self, method, converged: bool, error: float, elapsed: float, iterations: int):
"""
:param method:
:param converged:
:param error:
:param elapsed:
:param iterations:
:return:
"""
self.methods_.append(method)
self.converged_.append(converged)
self.error_.append(error)
self.elapsed_.append(elapsed)
self.iterations_.append(iterations)
[docs]
def converged(self) -> bool:
"""
:return:
"""
if len(self.converged_) > 0:
return self.converged_[-1]
else:
return False
[docs]
def error(self) -> float:
"""
:return:
"""
if len(self.error_) > 0:
return self.error_[-1]
else:
return 0.0
[docs]
def elapsed(self) -> float:
"""
:return:
"""
if len(self.elapsed_) > 0:
return self.elapsed_[-1]
else:
return 0.0
[docs]
def iterations(self) -> float:
"""
:return:
"""
if len(self.iterations_) > 0:
return self.iterations_[-1]
else:
return 0.0
[docs]
def to_dataframe(self) -> pd.DataFrame:
"""
:return:
"""
data = {'Method': self.methods_,
'Converged?': self.converged_,
'Error': self.error_,
'Elapsed (s)': self.elapsed_,
'Iterations': self.iterations_}
return pd.DataFrame(data)
[docs]
def get_list_dim(a: List[Any]) -> int:
"""
Get the dimensions of a List, this is for the case were a matrix is represented by lists of lists
:param a: some List
:return: Dimensions
"""
if not isinstance(a, list):
return 0
else:
if len(a) > 0:
if isinstance(a[0], list):
return 2
else:
return 1
else:
return 1
[docs]
class CompressedJsonStruct:
"""
Compressed json block
"""
def __init__(self, fields: List[str] = None, data: List[Any] = None):
self.__fields: List[str] = list()
self.__data: List[Any] = list() if data is None else data
if fields is not None:
self.__fields = fields
if data is not None:
self.set_data(data)
self.__fields_pos_dict: Dict[str, int] = self.get_position_dict()
[docs]
def get_position_dict(self):
"""
:return:
"""
return {val: i for i, val in enumerate(self.__fields)}
[docs]
def set_fields(self, fields: List[str]):
"""
Set the block fields and initialize the reverse index lookup
:param fields: list of property names
:return:
"""
self.__fields = fields
self.__fields_pos_dict = self.get_position_dict()
[docs]
def set_data(self, dta: List[Any]):
"""
Set the data and check its consistency
:param dta: list of lists
:return: Nothing
"""
if isinstance(dta, list):
if len(dta) > 0:
dim = get_list_dim(dta)
if dim == 2:
self.__data = dta
elif dim == 1:
self.__data = [dta]
else:
raise Exception('The list has the wrong number of dimensions: ' + str(dim))
if len(self.__data[0]) != len(self.__fields):
raise Exception("Data length does not match the fields length")
[docs]
def get_data(self):
"""
:return:
"""
return self.__data
[docs]
def get_row_number(self):
"""
:return:
"""
return len(self.__data)
[docs]
def get_col_index(self, prop):
"""
:param prop:
:return:
"""
return self.__fields_pos_dict[prop]
[docs]
def get_final_dict(self):
"""
:return:
"""
return {'fields': self.__fields,
'data': self.__data[0] if len(self.__data) == 1 else self.__data}
[docs]
def get_dict_at(self, i):
"""
:param i:
:return:
"""
return {f: val for f, val in zip(self.__fields, self.__data[i])}
[docs]
def declare_n_entries(self, n):
"""
Add n entries to the data
:param n:
:return:
"""
nf = len(self.__fields)
self.__data = [[None] * nf for _ in range(n)]
[docs]
def set_at(self, i, col_name, val):
"""
Set value at a position, counts that the data has been declared
:param i: column index (object index)
:param col_name: name of the property
:param val: value to set
"""
j = self.get_col_index(prop=col_name)
self.__data[i][j] = val
[docs]
class ListSet(list):
"""
This is a class that behaves like a list except for the query "in" where it behaves like a set O(1)
"""
def __init__(self, iterable=None):
"""Initialize the ListSet with an optional iterable."""
super().__init__()
self._set = set()
if iterable:
self.extend(iterable)
[docs]
def append(self, value: "Bus") -> None:
"""Append an item to the list if it's not already present."""
if value not in self._set:
super().append(value)
self._set.add(value)
[docs]
def extend(self, iterable):
"""Extend the list by appending elements from the iterable, ensuring uniqueness."""
for value in iterable:
self.append(value)
[docs]
def insert(self, index, value):
"""Insert an item at a given position if it's not already present."""
if value not in self._set:
super().insert(index, value)
self._set.add(value)
[docs]
def remove(self, value):
"""Remove the first occurrence of a value. Raises ValueError if not found."""
super().remove(value)
self._set.remove(value)
[docs]
def pop(self, index=-1):
"""Remove and return the item at the given position."""
value = super().pop(index)
self._set.remove(value)
return value
[docs]
def clear(self):
"""Remove all items from the list."""
super().clear()
self._set.clear()
def __contains__(self, value):
"""Check if the value is in the ListSet using the internal set for O(1) queries."""
return value in self._set
def __add__(self, other):
"""Return a new ListSet containing elements from self and other, ensuring uniqueness."""
return ListSet(self + [item for item in other if item not in self._set])
def __iadd__(self, other):
"""Extend the ListSet in place with elements from other, ensuring uniqueness."""
self.extend(other)
return self
def __setitem__(self, index, value):
"""Set the item at the given index, ensuring uniqueness."""
if value in self._set and self[index] != value:
raise ValueError(f"Duplicate value '{value}' is not allowed in ListSet.")
self._set.discard(self[index]) # Remove the old value from the set
super().__setitem__(index, value)
self._set.add(value)
def __delitem__(self, index):
"""Delete the item at the given index."""
value = self[index]
super().__delitem__(index)
self._set.remove(value)
[docs]
def copy(self):
"""Return a shallow copy of the ListSet."""
return ListSet(self)
def __deepcopy__(self, memo):
copied_list = ListSet(copy.deepcopy(list(self), memo))
return copied_list
[docs]
class Vector:
"""
Python implementation of a C++ like std::vector
"""
def __init__(self, size=0, value=None):
"""
Initialize vector with a given size and optional fill value.
:param size: Size of the vector
:param value: Value to initilaize with:
For immutable types (int, float, str, None), store directly.
For objects, store independent deep copies.
"""
if isinstance(value, (int, float, str, type(None), bool)):
self._data = [value] * size
elif isinstance(value, list):
self._data = [list()] * size
elif isinstance(value, set):
self._data = [set()] * size
elif isinstance(value, dict):
self._data = [dict()] * size
else:
self._data = [copy.deepcopy(value) for _ in range(size)]
def __getitem__(self, index):
return self._data[index]
def __setitem__(self, index, value):
self._data[index] = value
def __len__(self):
return len(self._data)
[docs]
def push_back(self, value):
"""Append a value at the end."""
self._data.append(value)
[docs]
def pop_back(self):
"""Remove and return the last element."""
if not self._data:
raise IndexError("pop_back from empty vector")
return self._data.pop()
[docs]
def clear(self):
"""Remove all elements."""
self._data.clear()
[docs]
def resize(self, new_size, value=None):
"""Resize the vector like in C++ std::vector."""
current_size = len(self._data)
if new_size < current_size:
self._data = self._data[:new_size]
else:
if isinstance(value, (int, float, str, type(None), bool)):
self._data.extend([value] * (new_size - current_size))
elif isinstance(value, list):
self._data = [list()] * (new_size - current_size)
elif isinstance(value, set):
self._data = [set()] * (new_size - current_size)
elif isinstance(value, dict):
self._data = [dict()] * (new_size - current_size)
else:
self._data.extend(copy.deepcopy(value) for _ in range(new_size - current_size))
[docs]
def copy(self):
"""Return a deep copy of the vector."""
return copy.deepcopy(self)
def __deepcopy__(self, memo):
"""Support the copy.deepcopy protocol."""
new_vec = Vector()
new_vec._data = copy.deepcopy(self._data, memo)
return new_vec
def __iter__(self):
return iter(self._data)
def __repr__(self):
return f"Vector({self._data})"