Source code for VeraGridEngine.basic_structures

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.  
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations

from typing import List, Any, Dict, Union, Tuple
import pandas as pd
import numpy as np
import numpy.typing as npt
import datetime
from scipy.sparse import csc_matrix, csr_matrix
from VeraGridEngine.enumerations import TimeGrouping, LogSeverity
import copy

IntList = List[int]
Numeric = Union[int, float, bool, complex]
NumericVec = npt.NDArray[np.float64]
DateVec = pd.DatetimeIndex
IntVec = npt.NDArray[np.int_]
BoolVec = npt.NDArray[np.bool_]
Vec = npt.NDArray[np.float64]
CxVec = npt.NDArray[np.complex128]
StrVec = npt.NDArray[np.str_]
ObjVec = npt.NDArray[np.object_]
Mat = Union[npt.NDArray[np.float64], np.ndarray[tuple[int, int], np.dtype[np.float64]]]  # no way yet to say it is 2D
CxMat = npt.NDArray[np.complex128]  # no way yet to say it is 2D
IntMat = npt.NDArray[np.int_]  # no way yet to say it is 2D
StrMat = npt.NDArray[np.str_]  # no way yet to say it is 2D
ObjMat = npt.NDArray[np.object_]  # no way yet to say it is 2D

CscMat = csc_matrix
CsrMat = csr_matrix


[docs] class CDF: """ Inverse Cumulative density function of a given array of data """ def __init__(self, data: Vec | pd.Series): """ Constructor @param data: Array (DataFrame or numpy array) """ # Create the CDF of the data # sort the data: if isinstance(data, pd.Series): self.arr = np.sort(np.ndarray.flatten(data.values)) else: self.arr = np.sort(data, axis=0) self.iscomplex = np.iscomplexobj(self.arr) # calculate the proportional values of samples n = len(data) if n > 1: self.prob = np.arange(n, dtype=float) / (n - 1) else: self.prob = np.arange(n, dtype=float) # iterator index self.idx = 0 # array length self.len = len(self.arr) def __call__(self): """ Call this as CDF() @return: """ return self.arr def __iter__(self): """ Iterator constructor @return: """ self.idx = 0 return self def __next__(self): """ Iterator next element @return: """ if self.idx == self.len: raise StopIteration self.idx += 1 return self.arr[self.idx - 1] def __add__(self, other): """ Sum of two CDF @param other: @return: A CDF object with the sum of other CDF to this CDF """ return CDF(np.array([a + b for a in self.arr for b in other])) def __sub__(self, other): """ Subtract of two CDF @param other: @return: A CDF object with the subtraction a a CDF to this CDF """ return CDF(np.array([a - b for a in self.arr for b in other]))
[docs] def get_sample(self, npoints=1): """ Samples a number of uniform distributed points and returns the corresponding probability values given the CDF. @param npoints: Number of points to sample, 1 by default @return: Corresponding probabilities """ pt = np.random.uniform(0, 1, npoints) if self.iscomplex: a = np.interp(pt, self.prob, self.arr.real) b = np.interp(pt, self.prob, self.arr.imag) return a + 1j * b else: return np.interp(pt, self.prob, self.arr)
[docs] def get_at(self, prob: float): """ Samples a number of uniform distributed points and returns the corresponding probability values given the CDF. @param prob: probability from 0 to 1 @return: Corresponding CDF value """ if self.iscomplex: a = np.interp(prob, self.prob, self.arr.real) b = np.interp(prob, self.prob, self.arr.imag) return a + 1j * b else: return np.interp(prob, self.prob, self.arr)
[docs] def expectation(self) -> float | complex: """ Returns the CDF expected value (AKA the mean) :return: expectation """ n = len(self.arr) return np.sum(self.arr) * (1 / n)
[docs] def plot(self, plt, LINEWIDTH: int, ax=None): """ Plots the CFD :param plt: MatPlotLib plt module :param LINEWIDTH: line width in pixels :param ax: MatPlotLib axis to plot into :return: """ if ax is None: fig = plt.figure() ax = fig.add_subplot(111) ax.plot(self.prob, self.arr, linewidth=LINEWIDTH) ax.set_xlabel('$p(x)$') ax.set_ylabel('$x$')
[docs] def classify_by_hour(t: pd.DatetimeIndex) -> List[List[int]]: """ Passes an array of TimeStamps to an array of arrays of indices classified by hour of the year @param t: Pandas time Index array @return: list of lists of integer indices """ n = len(t) offset = t[0].hour * t[0].dayofyear mx = t[n - 1].hour * t[n - 1].dayofyear arr: List[List[int]] = list() for i in range(mx - offset + 1): arr.append(list()) for i in range(n): hourofyear = t[i].hour * t[i].dayofyear arr[hourofyear - offset].append(i) return arr
[docs] def classify_by_day(t: pd.DatetimeIndex) -> list[list[int]]: """ Passes an array of TimeStamps to an array of arrays of indices classified by day of the year @param t: Pandas time Index array @return: list of lists of integer indices """ n = len(t) offset = t[0].dayofyear mx = t[n - 1].dayofyear arr: list[list[int]] = list() for i in range(mx - offset + 1): arr.append(list()) for i in range(n): hourofyear = t[i].dayofyear arr[hourofyear - offset].append(i) return arr
[docs] def get_time_groups(t_array: pd.DatetimeIndex, grouping: TimeGrouping) -> List[int]: """ Get the indices delimiting a number of groups :param t_array: DatetimeIndex object containing dates :param grouping: TimeGrouping value :return: list of indices that determine the partitions """ groups: List[int] = list() nt = len(t_array) last = -1 i = 0 for i in range(nt): t = t_array[i] if grouping == TimeGrouping.Monthly: if t.month != last: last = t.month groups.append(i) elif grouping == TimeGrouping.Weekly: if t.week != last: last = t.week groups.append(i) elif grouping == TimeGrouping.Daily: if t.day != last: last = t.day groups.append(i) elif grouping == TimeGrouping.Hourly: if t.hour != last: last = t.hour groups.append(i) # add the last index if it is not already there if nt > 0: if i != groups[len(groups) - 1]: groups.append(i) return groups
[docs] class LogEntry: """ Logger entry """ def __init__(self, time: Union[str, None] = None, msg="", severity: LogSeverity = LogSeverity.Information, device="", value="", expected_value="", device_class="", device_property="", object_value=None, expected_object_value=None): if time is None: self.time = "{date:%H:%M:%S}".format(date=datetime.datetime.now()) # might use %Y/%m/%d %H:%M:%S else: self.time = time self.msg = str(msg) self.severity = severity self.device = device self.device_class = device_class self.device_property = device_property self.value = value self.expected_value = str(expected_value) self.object_value = object_value self.expected_object_value = expected_object_value
[docs] def to_list(self) -> List[Any]: """ Get list representation of this entry :return: """ return [self.time, self.severity.value, self.msg, self.device_class, self.device_property, self.device, self.value, self.expected_value]
[docs] def to_list_reduced(self) -> List[Any]: """ Get list representation of this entry :return: """ return [self.time, self.device_class, self.device_property, self.device, self.value, self.expected_value]
def __str__(self): return "{0} {1}: {2} {3} {4} {5}".format(self.time, self.severity.value, self.msg, self.device, self.value, self.expected_value) def __repr__(self): return self.msg
[docs] class Logger: """ Logger class """ def __init__(self) -> None: self.entries: List[LogEntry] = list() self.debug_entries: List[str] = list()
[docs] def add_debug(self, *args): """ Add debug entry :param args: :return: """ self.debug_entries.append(" ".join([str(x) for x in args]))
[docs] def append(self, txt: str): """ simple text log :param txt: some message text """ self.entries.append(LogEntry(txt))
[docs] def has_logs(self) -> bool: """ Are there any logs? :return: True / False """ return len(self.entries) > 0
[docs] def add_info(self, msg: str, device="", value="", expected_value="", device_class='', comment='', device_property='', object_value=None, expected_object_value=None): """ Add info entry :param msg: :param device: :param value: :param expected_value: :param device_class: :param comment: :param device_property: :param object_value: :param expected_object_value: :return: """ self.entries.append(LogEntry(msg=str(msg), severity=LogSeverity.Information, device=str(device), value=str(value), expected_value=str(expected_value), device_class=str(device_class), device_property=str(device_property), object_value=str(object_value), expected_object_value=str(expected_object_value)))
[docs] def add_warning(self, msg: str, device="", value="", expected_value="", device_class='', comment='', device_property='', object_value=None, expected_object_value=None): """ Add warning entry :param msg: :param device: :param value: :param expected_value: :param device_class: :param comment: :param device_property: :param object_value: :param expected_object_value: :return: """ self.entries.append(LogEntry(msg=str(msg), severity=LogSeverity.Warning, device=str(device), value=str(value), expected_value=str(expected_value), device_class=str(device_class), device_property=str(device_property), object_value=str(object_value), expected_object_value=str(expected_object_value)))
[docs] def add_error(self, msg: str, device="", value="", expected_value="", device_class='', comment='', device_property='', object_value=None, expected_object_value=None): """ Add error entry :param msg: :param device: :param value: :param expected_value: :param device_class: :param comment: :param device_property: :param object_value: :param expected_object_value: :return: """ self.entries.append(LogEntry(msg=str(msg), severity=LogSeverity.Error, device=str(device), value=str(value), expected_value=str(expected_value), device_class=str(device_class), device_property=str(device_property), object_value=str(object_value), expected_object_value=str(expected_object_value)))
[docs] def add_divergence(self, msg, device="", value=0, expected_value=0, tol=1e-6): """ Add divergence entry :param msg: :param device: :param value: :param expected_value: :param tol: :return: """ if abs(value - expected_value) > tol: self.entries.append(LogEntry(msg=str(msg), severity=LogSeverity.Divergence, device=str(device), value=str(value), expected_value=str(expected_value), device_class="", device_property="", object_value=None, expected_object_value=None))
[docs] def add(self, msg: str, severity: LogSeverity = LogSeverity.Error, device="", value="", expected_value="", device_class='', comment='', device_property='', object_value=None, expected_object_value=None): """ Add general entry :param msg: :param severity: :param device: :param value: :param expected_value: :param device_class: :param comment: :param device_property: :param object_value: :param expected_object_value: :return: """ # self.entries.append(LogEntry(msg, severity, device, str(value), str(expected_value))) self.entries.append(LogEntry(msg=str(msg), severity=severity, device=str(device), value=str(value), expected_value=str(expected_value), device_class=str(device_class), device_property=str(device_property), object_value=str(object_value), expected_object_value=str(expected_object_value)))
[docs] def to_dict(self) -> Union[ Dict[str, Dict[str, List[Tuple[str, str, str, str]]]], Dict[str, Dict[str, List[List[str]]]]]: """ Get the logs sorted by severity and message :return: Dictionary[Dictionary[List[time, device, value, expected value]]] """ by_severity = dict() hdr = ['Time', 'Class', 'Property', 'Device', 'Value', 'Expected value'] for e in self.entries: if e.severity.value not in by_severity.keys(): by_severity[e.severity.value] = dict() by_msg = by_severity[e.severity.value] if e.msg in by_msg.keys(): # add msg to existing msg list by_msg[e.msg].append(e.to_list_reduced()) else: # add msg entry for the first time by_msg[e.msg] = [e.to_list_reduced()] return by_severity
[docs] def to_df(self) -> pd.DataFrame: """ Get DataFrame :return: DataFrame """ data = [e.to_list() for e in self.entries] df = pd.DataFrame(data=data, columns=['Time', 'Severity', 'Message', 'Class', 'Property', 'Device', 'Value', 'Expected value']) df.set_index('Time', inplace=True) return df
[docs] def parse_df(self, df: pd.DataFrame): """ Parse DataFrame :param df: DataFrame """ for i, row in df.iterrows(): self.entries.append(LogEntry(msg=str(row["Message"]), severity=LogSeverity(row["Severity"]), device=str(row["Device"]), value=str(row["Value"]), expected_value=str(row["Expected value"]), device_class=str(row["Class"]), device_property=str(row["Property"]), object_value="", expected_object_value=""))
[docs] def to_csv(self, fname): """ Save to CSV :param fname: file name """ self.to_df().to_csv(fname)
[docs] def to_xlsx(self, fname): """ To Excel :param fname: file name """ self.to_df().to_excel(fname)
[docs] def print(self, title: str = "") -> None: """ Print the logs """ if title != "": print(title) print(self.to_df())
def __str__(self) -> str: val = '' for e in self.entries: val += str(e) + '\n' return val def __getitem__(self, key): """ get [index] implementation :param key: integer :return: message, severity """ return self.entries[key] def __setitem__(self, idx, value): """ set [index] implementation :param idx: integer :param value: string message :return: Nothing """ self.entries[idx] = value def __iadd__(self, other: "Logger"): """ += implementation :param other: :return: """ if other is not None: self.entries += other.entries return self def __len__(self) -> int: return len(self.entries)
[docs] def size(self) -> int: """ Number of logs :return: size """ return len(self.entries)
[docs] def count_type(self, severity: LogSeverity) -> int: """ Count the number of entries of a certain severity :param severity: LogSeverity :return: number of occurences """ c = 0 for entry in self.entries: if entry.severity == severity: c += 1 return c
[docs] def info_count(self) -> int: """ Count the number of information occurences :return: """ return self.count_type(LogSeverity.Information)
[docs] def warning_count(self) -> int: """ Count number of warnings :return: """ return self.count_type(LogSeverity.Warning)
[docs] def error_count(self) -> int: """ Count number of errors :return: """ return self.count_type(LogSeverity.Error)
[docs] def has_errors(self) -> bool: """ Check if there are errors :return: """ return self.error_count() > 0
[docs] class ConvergenceReport: """ Convergence report """ def __init__(self) -> None: """ Constructor """ self.methods_ = list() self.converged_ = list() self.error_ = list() self.elapsed_ = list() self.iterations_ = list()
[docs] def add(self, method, converged: bool, error: float, elapsed: float, iterations: int): """ :param method: :param converged: :param error: :param elapsed: :param iterations: :return: """ self.methods_.append(method) self.converged_.append(converged) self.error_.append(error) self.elapsed_.append(elapsed) self.iterations_.append(iterations)
[docs] def converged(self) -> bool: """ :return: """ if len(self.converged_) > 0: return self.converged_[-1] else: return False
[docs] def error(self) -> float: """ :return: """ if len(self.error_) > 0: return self.error_[-1] else: return 0.0
[docs] def elapsed(self) -> float: """ :return: """ if len(self.elapsed_) > 0: return self.elapsed_[-1] else: return 0.0
[docs] def iterations(self) -> float: """ :return: """ if len(self.iterations_) > 0: return self.iterations_[-1] else: return 0.0
[docs] def to_dataframe(self) -> pd.DataFrame: """ :return: """ data = {'Method': self.methods_, 'Converged?': self.converged_, 'Error': self.error_, 'Elapsed (s)': self.elapsed_, 'Iterations': self.iterations_} return pd.DataFrame(data)
[docs] def get_list_dim(a: List[Any]) -> int: """ Get the dimensions of a List, this is for the case were a matrix is represented by lists of lists :param a: some List :return: Dimensions """ if not isinstance(a, list): return 0 else: if len(a) > 0: if isinstance(a[0], list): return 2 else: return 1 else: return 1
[docs] class CompressedJsonStruct: """ Compressed json block """ def __init__(self, fields: List[str] = None, data: List[Any] = None): self.__fields: List[str] = list() self.__data: List[Any] = list() if data is None else data if fields is not None: self.__fields = fields if data is not None: self.set_data(data) self.__fields_pos_dict: Dict[str, int] = self.get_position_dict()
[docs] def get_position_dict(self): """ :return: """ return {val: i for i, val in enumerate(self.__fields)}
[docs] def set_fields(self, fields: List[str]): """ Set the block fields and initialize the reverse index lookup :param fields: list of property names :return: """ self.__fields = fields self.__fields_pos_dict = self.get_position_dict()
[docs] def set_data(self, dta: List[Any]): """ Set the data and check its consistency :param dta: list of lists :return: Nothing """ if isinstance(dta, list): if len(dta) > 0: dim = get_list_dim(dta) if dim == 2: self.__data = dta elif dim == 1: self.__data = [dta] else: raise Exception('The list has the wrong number of dimensions: ' + str(dim)) if len(self.__data[0]) != len(self.__fields): raise Exception("Data length does not match the fields length")
[docs] def get_data(self): """ :return: """ return self.__data
[docs] def get_row_number(self): """ :return: """ return len(self.__data)
[docs] def get_col_index(self, prop): """ :param prop: :return: """ return self.__fields_pos_dict[prop]
[docs] def get_final_dict(self): """ :return: """ return {'fields': self.__fields, 'data': self.__data[0] if len(self.__data) == 1 else self.__data}
[docs] def get_dict_at(self, i): """ :param i: :return: """ return {f: val for f, val in zip(self.__fields, self.__data[i])}
[docs] def declare_n_entries(self, n): """ Add n entries to the data :param n: :return: """ nf = len(self.__fields) self.__data = [[None] * nf for _ in range(n)]
[docs] def set_at(self, i, col_name, val): """ Set value at a position, counts that the data has been declared :param i: column index (object index) :param col_name: name of the property :param val: value to set """ j = self.get_col_index(prop=col_name) self.__data[i][j] = val
[docs] class ListSet(list): """ This is a class that behaves like a list except for the query "in" where it behaves like a set O(1) """ def __init__(self, iterable=None): """Initialize the ListSet with an optional iterable.""" super().__init__() self._set = set() if iterable: self.extend(iterable)
[docs] def append(self, value: "Bus") -> None: """Append an item to the list if it's not already present.""" if value not in self._set: super().append(value) self._set.add(value)
[docs] def extend(self, iterable): """Extend the list by appending elements from the iterable, ensuring uniqueness.""" for value in iterable: self.append(value)
[docs] def insert(self, index, value): """Insert an item at a given position if it's not already present.""" if value not in self._set: super().insert(index, value) self._set.add(value)
[docs] def remove(self, value): """Remove the first occurrence of a value. Raises ValueError if not found.""" super().remove(value) self._set.remove(value)
[docs] def pop(self, index=-1): """Remove and return the item at the given position.""" value = super().pop(index) self._set.remove(value) return value
[docs] def clear(self): """Remove all items from the list.""" super().clear() self._set.clear()
def __contains__(self, value): """Check if the value is in the ListSet using the internal set for O(1) queries.""" return value in self._set def __add__(self, other): """Return a new ListSet containing elements from self and other, ensuring uniqueness.""" return ListSet(self + [item for item in other if item not in self._set]) def __iadd__(self, other): """Extend the ListSet in place with elements from other, ensuring uniqueness.""" self.extend(other) return self def __setitem__(self, index, value): """Set the item at the given index, ensuring uniqueness.""" if value in self._set and self[index] != value: raise ValueError(f"Duplicate value '{value}' is not allowed in ListSet.") self._set.discard(self[index]) # Remove the old value from the set super().__setitem__(index, value) self._set.add(value) def __delitem__(self, index): """Delete the item at the given index.""" value = self[index] super().__delitem__(index) self._set.remove(value)
[docs] def copy(self): """Return a shallow copy of the ListSet.""" return ListSet(self)
def __deepcopy__(self, memo): copied_list = ListSet(copy.deepcopy(list(self), memo)) return copied_list
[docs] class Vector: """ Python implementation of a C++ like std::vector """ def __init__(self, size=0, value=None): """ Initialize vector with a given size and optional fill value. :param size: Size of the vector :param value: Value to initilaize with: For immutable types (int, float, str, None), store directly. For objects, store independent deep copies. """ if isinstance(value, (int, float, str, type(None), bool)): self._data = [value] * size elif isinstance(value, list): self._data = [list()] * size elif isinstance(value, set): self._data = [set()] * size elif isinstance(value, dict): self._data = [dict()] * size else: self._data = [copy.deepcopy(value) for _ in range(size)] def __getitem__(self, index): return self._data[index] def __setitem__(self, index, value): self._data[index] = value def __len__(self): return len(self._data)
[docs] def push_back(self, value): """Append a value at the end.""" self._data.append(value)
[docs] def pop_back(self): """Remove and return the last element.""" if not self._data: raise IndexError("pop_back from empty vector") return self._data.pop()
[docs] def clear(self): """Remove all elements.""" self._data.clear()
[docs] def resize(self, new_size, value=None): """Resize the vector like in C++ std::vector.""" current_size = len(self._data) if new_size < current_size: self._data = self._data[:new_size] else: if isinstance(value, (int, float, str, type(None), bool)): self._data.extend([value] * (new_size - current_size)) elif isinstance(value, list): self._data = [list()] * (new_size - current_size) elif isinstance(value, set): self._data = [set()] * (new_size - current_size) elif isinstance(value, dict): self._data = [dict()] * (new_size - current_size) else: self._data.extend(copy.deepcopy(value) for _ in range(new_size - current_size))
[docs] def copy(self): """Return a deep copy of the vector.""" return copy.deepcopy(self)
def __deepcopy__(self, memo): """Support the copy.deepcopy protocol.""" new_vec = Vector() new_vec._data = copy.deepcopy(self._data, memo) return new_vec def __iter__(self): return iter(self._data) def __repr__(self): return f"Vector({self._data})"