# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
import json
import numpy as np
import pandas as pd
from typing import List, Dict, Any, Union, TYPE_CHECKING, Tuple
from VeraGridEngine.Simulations.results_table import ResultsTable
from VeraGridEngine.basic_structures import (IntVec, IntMat, Vec, CxVec, StrVec, StrMat, Mat, DateVec, CxMat, BoolVec,
Logger)
from VeraGridEngine.enumerations import StudyResultsType, ResultTypes, SimulationTypes
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
if TYPE_CHECKING: # Only imports the below statements during type checking
from VeraGridEngine.Simulations.Clustering.clustering_results import ClusteringResults
[docs]
class ResultsProperty:
"""
ResultsProperty
"""
__slots__ = (
"name",
"tpe",
"old_names",
"expandable"
)
def __init__(self, name: str,
tpe: Any,
old_names: Union[List[str], Tuple[str, ...]],
expandable: bool) -> None:
"""
ResultsProperty
:param name: name of the property
:param tpe: type of the property (Vec, Mat, double, ...)
:param old_names: list of previous names. Use in case of renaming a registered property
"""
self.name: str = name
self.tpe: Any = tpe
self.old_names: Tuple[str, ...] = tuple(old_names)
self.expandable = expandable
[docs]
class ResultsTemplate(metaclass=ResultsTemplateMeta):
"""
ResultsTemplate
"""
__slots__ = (
"name",
"report_text",
"study_results_type",
"available_results",
"_data_variables",
"_time_array",
"_is_3ph",
"clustering_results",
"using_clusters",
"time_indices",
"sampled_probabilities",
"original_sample_idx",
"F",
"T",
"hvdc_F",
"hvdc_T",
"bus_area_indices",
"area_names",
"_ResultsTemplate__show_plot",
)
def __init__(
self,
name: str,
available_results: Union[Dict[ResultTypes, List[ResultTypes]], List[ResultTypes]],
# available_results: Union[Dict[ResultTypes, List[ResultTypes]], List[ResultTypes], Dict[ResultTypes, Dict[ResultTypes, List[ResultTypes]]], List[ResultTypes, Dict[ResultTypes, Dict[ResultTypes, List[ResultTypes]]]]],
time_array: Union[DateVec, None],
clustering_results: Union[ClusteringResults, None],
study_results_type: StudyResultsType,
is_3ph: bool = False):
"""
Results template class
:param name: Name of the class
:param available_results: list of stuff to represent the results
:param clustering_results: ClusteringResults object (optional)
:param study_results_type: StudyResultsType Instance
:param is_3ph: is this a 3-phase result?
"""
self.name: str = name
self.report_text: str = ""
self.study_results_type: StudyResultsType = study_results_type
self.available_results: Dict[ResultTypes, List[ResultTypes]] = available_results
self._data_variables: Dict[str, ResultsProperty] = type(self).CLASS_DATA_VARIABLES
self._time_array: Union[DateVec, None] = time_array
self._is_3ph = is_3ph
if clustering_results:
self.clustering_results = clustering_results
self.using_clusters = True
self.time_indices: IntVec | None = clustering_results.time_indices
self.sampled_probabilities: Vec | None = clustering_results.sampled_probabilities
self.original_sample_idx: IntVec | None = clustering_results.original_sample_idx
else:
self.clustering_results = None
self.using_clusters = False
self.time_indices: IntVec | None = None
self.sampled_probabilities: Vec | None = None
self.original_sample_idx: IntVec | None = None
# vars for the inter-area computation
self.F: IntVec | None = None
self.T: IntVec | None = None
self.hvdc_F: IntVec | None = None
self.hvdc_T: IntVec | None = None
self.bus_area_indices: IntVec | None = None
self.area_names: StrVec | None = None
self.__show_plot = True
@property
def data_variables(self) -> Dict[str, ResultsProperty]:
"""
:return:
"""
return type(self).CLASS_DATA_VARIABLES
@property
def time_array(self) -> DateVec:
"""
Array of time steps
:return:
"""
return self._time_array
@time_array.setter
def time_array(self, time_array: DateVec):
self._time_array = time_array
@property
def is_3ph(self):
return self._is_3ph
[docs]
def plotting_allowed(self) -> bool:
"""
:return:
"""
return self.__show_plot
[docs]
def activate_plotting(self):
"""
:return:
"""
self.__show_plot = True
[docs]
def deactivate_plotting(self):
"""
:return:
"""
self.__show_plot = False
[docs]
def register(self, name: str, tpe: Union[Vec, Mat, CxVec, CxMat], old_names: Union[None, List[str]] = None) -> None:
"""
Runtime registration is intentionally disabled.
:param name: name of the variable to register (is checked)
:param tpe: type of the variable
:param old_names: list of old names for retro compatibility (optional)
:return: None
"""
raise RuntimeError(
"Runtime results registration is disabled. "
"Declare results in LOCAL_RESULTS_DECLARATIONS."
)
[docs]
def consolidate_after_loading(self):
"""
Consolidate
"""
pass
[docs]
def prepare_for_saving(self) -> None:
"""
Refresh any derived payload that must be serialized with the results.
:return: None.
"""
pass
[docs]
def get_dict(self) -> Dict[str, Any]:
"""
Get data to pass via json
:return:
"""
data = dict()
# traverse the registered results
for arr_name, arr_prop in self._data_variables.items():
# get the array
arr: np.ndarray = getattr(self, arr_name)
if arr_prop.tpe in (CxVec, CxMat):
r = arr.real
i = arr.imag
data[arr_name] = {
"real": r.tolist(),
"imag": i.tolist(),
}
elif arr_prop.tpe in (Vec, Mat, IntVec, IntMat, StrVec, StrMat, BoolVec):
if isinstance(arr, np.ndarray):
data[arr_name] = arr.tolist()
elif isinstance(arr, list):
data[arr_name] = arr
elif arr_prop.tpe == DateVec:
data[arr_name] = arr.values.astype(float).tolist() # pass the unix nano-seconds
else:
if isinstance(arr, list):
data[arr_name] = arr
elif isinstance(arr, dict):
data[arr_name] = arr
else:
data[arr_name] = arr
return data
[docs]
def parse_data(self, data: Dict[str, Any | Dict[str, Any]]):
"""
The function to parse the data created with get_dict
:param data:
:return:
"""
for arr_name, arr_prop in self._data_variables.items():
arr_data = data.get(arr_name, None)
if arr_prop.tpe in (CxVec, CxMat):
r = np.array(arr_data["real"])
i = np.array(arr_data["imag"])
arr = r + 1j * i
setattr(self, arr_name, arr)
elif arr_prop.tpe in (Vec, Mat, IntVec, IntMat, StrVec, StrMat, BoolVec):
arr = np.array(arr_data)
setattr(self, arr_name, arr)
elif arr_prop.tpe == DateVec:
arr = pd.to_datetime(np.array(arr_data), unit='ns') # pass the unix nano-seconds
setattr(self, arr_name, arr)
else:
data[arr_name] = arr_data
[docs]
def get_name_to_results_type_dict(self):
"""
:return:
"""
d = dict()
if isinstance(self.available_results, dict):
for key, values in self.available_results.items():
for item in values:
d[item.value] = item
if isinstance(self.available_results, list):
for item in self.available_results:
d[item.value] = item
return d
[docs]
def get_name_tree(self):
"""
:return:
"""
d = dict()
if isinstance(self.available_results, dict):
for key, values in self.available_results.items():
d[key.value] = [x.value for x in values]
if isinstance(self.available_results, list):
d = [x.value for x in self.available_results]
return d
[docs]
def to_json(self, file_name):
"""
Export as json
:param file_name: File name
"""
with open(file_name, "w") as output_file:
json_str = json.dumps(self.get_dict())
output_file.write(json_str)
[docs]
def apply_new_rates(self, rates: Vec):
"""
:param rates:
"""
pass
[docs]
def apply_new_time_series_rates(self, rates: Vec):
"""
:param rates:
"""
pass
[docs]
def get_inter_area_flows(self,
area_names: StrVec,
F: IntVec,
T: IntVec,
Sf: CxVec,
hvdc_F: IntVec,
hvdc_T: IntVec,
hvdc_Pf: Vec,
bus_area_indices: IntVec) -> Mat:
"""
:param area_names:
:param F:
:param T:
:param Sf:
:param hvdc_F:
:param hvdc_T:
:param hvdc_Pf:
:param bus_area_indices:
:return:
"""
na = len(area_names)
x = np.zeros((na, na), dtype=complex)
if na > 0:
for f, t, flow in zip(F, T, Sf):
a1 = bus_area_indices[f]
a2 = bus_area_indices[t]
if a1 != a2:
x[a1, a2] += flow
x[a2, a1] -= flow
for f, t, flow in zip(hvdc_F, hvdc_T, hvdc_Pf):
a1 = bus_area_indices[f]
a2 = bus_area_indices[t]
if a1 != a2:
x[a1, a2] += flow
x[a2, a1] -= flow
return x
[docs]
@staticmethod
def get_bus_values_per_area(bus_values: Vec, area_names: StrVec, bus_area_indices: IntVec) -> Vec:
"""
Split array of bus-related values per area
:param bus_values:
:param area_names:
:param bus_area_indices:
:return:
"""
na = len(area_names)
x = np.zeros(na, dtype=bus_values.dtype)
if na > 0:
for a, val in zip(bus_area_indices, bus_values):
x[a] += val
return x
[docs]
def get_branch_values_per_area(self, branch_values: Vec,
area_names: StrVec,
bus_area_indices: IntVec,
F: IntVec, T: IntVec):
"""
Split array of branch-related values per area
:param branch_values:
:param area_names:
:param bus_area_indices:
:param F:
:param T:
:return:
"""
na = len(area_names)
x = np.zeros((na, na), dtype=branch_values.dtype)
if na > 0:
for f, t, val in zip(F, T, branch_values):
a1 = bus_area_indices[f]
a2 = bus_area_indices[t]
x[a1, a2] += val
return x
[docs]
def get_hvdc_values_per_area(self, hvdc_values: Vec, area_names: StrVec, bus_area_indices: IntVec,
hvdc_F: IntVec, hvdc_T: IntVec):
"""
Split array of hvdc-related values per area
:param hvdc_values:
:param area_names:
:param bus_area_indices:
:param hvdc_F:
:param hvdc_T:
:return:
"""
na = len(area_names)
x = np.zeros((na, na), dtype=hvdc_values.dtype)
if na > 0:
for f, t, val in zip(hvdc_F, hvdc_T, hvdc_values):
a1 = bus_area_indices[f]
a2 = bus_area_indices[t]
x[a1, a2] += val
return x
[docs]
def fill_circuit_info(self, grid: MultiCircuit):
"""
:param grid:
:return:
"""
self.area_names, self.bus_area_indices, self.F, self.T, self.hvdc_F, self.hvdc_T = grid.get_branch_areas_info()
[docs]
def fill_simulation_info(self, grid: MultiCircuit):
"""
:param grid:
:return:
"""
self.area_names, self.bus_area_indices, self.F, self.T, self.hvdc_F, self.hvdc_T = grid.get_branch_areas_info()
[docs]
def mdl(self, result_type: ResultTypes) -> ResultsTable:
"""
Get results model (overloaded in the respective implementations)
:param result_type: ResultTypes
"""
pass
def _iter_instance_items(self):
"""
Iterate over instance attributes stored either in slots or in __dict__.
"""
seen = set()
for cls in type(self).mro():
cls_slots = cls.__dict__.get("__slots__", ())
if isinstance(cls_slots, str):
cls_slots = (cls_slots,)
for slot in cls_slots:
if slot in {"__dict__", "__weakref__"} or slot in seen:
continue
seen.add(slot)
if hasattr(self, slot):
yield slot, getattr(self, slot)
for prop, value in getattr(self, "__dict__", {}).items():
if prop not in seen:
yield prop, value
[docs]
def expand_clustered_results(self):
"""
Expand all arrays using the clustering info
"""
if self.using_clusters:
if self.clustering_results is not None:
# Replace the reduced time array by the expanded time array
self.time_array = self.clustering_results.time_array
# NOTE: You might be tempted to change this loop to use the registered properties.
# Don't do it, there may be unregistered properties that will fail if this doesn't
# traverse all the actual properties of the class... this may be a future topic
for prop in self.CLASS_RESULTS_DECLARATIONS:
if prop.expandable:
value: np.ndarray = getattr(self, prop.name)
if value.ndim == 1:
if len(value) > 0:
try:
arr = value[self.original_sample_idx] # expand
setattr(self, prop.name, arr) # overwrite the array
except IndexError as e:
print(f"Index error in expand_clustered_results (1D) for {prop}")
elif value.ndim == 2:
if value.shape[0] > 0:
try:
arr = value[self.original_sample_idx, :] # expand
setattr(self, prop.name, arr) # overwrite the array
except IndexError as e:
print(f"Index error in expand_clustered_results (2D) for {prop}")
else:
pass
else:
print("No clusters!")
[docs]
def parse_saved_data(self, grid: MultiCircuit,
data_dict: Dict[str, pd.DataFrame | np.ndarray],
logger: Logger = Logger()) -> None:
"""
:param grid: MultiCircuit
:param data_dict: Dictionary with the info loaded from disk
:param logger: Logger
:return:
"""
self.time_array = grid.get_time_array()
for arr_name, stored_data in data_dict.items():
is_complex = '__complex__' in arr_name
arr_name = arr_name.replace('__complex__', '')
# try to get the property of the saved file
res_prop: ResultsProperty = self._data_variables.get(arr_name, None)
if stored_data is not None and res_prop is not None:
# it may be complex...
if is_complex:
values_2d: np.ndarray
if isinstance(stored_data, pd.DataFrame):
values_2d = stored_data.values
else:
values_2d = np.asarray(stored_data)
split_pt = int(values_2d.shape[1] / 2)
r = values_2d[:, :split_pt]
i = values_2d[:, split_pt:]
array = r + 1j * i
else:
if isinstance(stored_data, pd.DataFrame):
# Keep the 2-D shape for the historical parquet path.
array = stored_data.values
else:
# NumPy payloads are already restored with their native shape.
array = np.asarray(stored_data)
vector_like_types = (Vec, IntVec, StrVec, BoolVec, CxVec)
if (
isinstance(array, np.ndarray)
and array.ndim == 2
and array.shape[1] == 1
and res_prop.tpe in vector_like_types
):
# if there is only one column, convert to array directly
array = array[:, 0]
# it may be a single number...
if res_prop.tpe in [int, float, complex]:
if array.size == 1:
array = array[0]
if res_prop.tpe == DateVec:
array = pd.to_datetime(np.asarray(array).reshape(-1))
else:
pass
curr_value = getattr(self, res_prop.name)
if isinstance(curr_value, (np.ndarray, pd.Index)):
# if results arrays are exactly 0, don't check
curr_shape = np.shape(curr_value)
dont_check = len(curr_shape) > 0 and sum(curr_shape) == 0
if np.shape(curr_value) == np.shape(array) or dont_check:
setattr(self, res_prop.name, array)
else:
logger.add_error(msg="Wrong array shape",
device_class=self.name,
device_property=res_prop.name,
value=str(np.shape(array)),
expected_value=str(np.shape(curr_value)))
else:
setattr(self, res_prop.name, array)