# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
import json
import math
import os
from pathlib import Path
from typing import Dict, Union, List, Tuple, Any, Callable
import pandas as pd
import numpy as np
from enum import EnumMeta as EnumType
from VeraGridEngine.basic_structures import Logger
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
import VeraGridEngine.Devices as dev
from VeraGridEngine.Devices.Parents.editable_device import GCProp, EditableDevice
from VeraGridEngine.Devices.Profiles import AnyProfile
from VeraGridEngine.Utils.Symbolic.symbolic_io import BlockSaver, BlockParser, Block
from VeraGridEngine.Devices.types import ALL_DEV_TYPES, VERAGRID_FILE_TYPE
from VeraGridEngine.Devices.Diagrams.base_diagram import copy_diagrams
from VeraGridEngine.enumerations import (DiagramType, DeviceType, SubObjectType, TapPhaseControl, TapModuleControl,
ContingencyOperationTypes)
[docs]
def get_objects_dictionary() -> Dict[str, ALL_DEV_TYPES]:
"""
creates a dictionary with the types and the circuit objects
:return: Dictionary instance
"""
# this list must be sorted in dependency order so that the
# loading algorithm is able to find the object substitutions
return {
'modelling_authority': dev.ModellingAuthority(),
'area': dev.Area(),
'zone': dev.Zone(),
'country': dev.Country(),
'community': dev.Community(),
'region': dev.Region(),
'municipality': dev.Municipality(),
'owner': dev.Owner(),
'substation': dev.Substation(),
'voltage_level': dev.VoltageLevel(),
'technology': dev.Technology(),
'fuel': dev.Fuel(),
'emission': dev.EmissionGas(),
'facility': dev.Facility(),
'rms_model_template': dev.RmsModelTemplate(),
'emt_model_template': dev.EmtModelTemplate(),
'fmu_template': dev.FmuTemplate(),
'bus': dev.Bus(),
'bus_bar': dev.BusBar(),
'load': dev.Load(),
'static_generator': dev.StaticGenerator(),
'battery': dev.Battery(),
'generator': dev.Generator(),
'shunt': dev.Shunt(),
'linear_shunt': dev.ControllableShunt(),
'external_grid': dev.ExternalGrid(),
'current_injection': dev.CurrentInjection(),
'wires': dev.Wire(),
'overhead_line_types': dev.OverheadLineType(),
'underground_cable_types': dev.UndergroundLineType(),
'sequence_line_types': dev.SequenceLineType(),
'transformer_types': dev.TransformerType(),
'branch_group': dev.BranchGroup(),
'branch': dev.Branch(),
'transformer2w': dev.Transformer2W(),
'windings': dev.Winding(),
'transformer3w': dev.Transformer3W(),
'transformernw': dev.TransformerNW(),
'line': dev.Line(),
'dc_line': dev.DcLine(),
'hvdc': dev.HvdcLine(),
'vsc': dev.VSC(),
'upfc': dev.UPFC(),
'series_reactance': dev.SeriesReactance(),
'switch': dev.Switch(),
'contingency_group': dev.ContingencyGroup(),
'contingency': dev.Contingency(),
'short_circuit_definition': dev.ShortCircuitEvent(),
'remedial_action_group': dev.RemedialActionGroup(),
'remedial_action': dev.RemedialAction(),
'investments_group': dev.InvestmentsGroup(),
'investment': dev.Investment(),
'rms_event_group': dev.RmsEventsGroup(),
'rms_event': dev.RmsEvent(),
'emt_event_group': dev.EmtEventsGroup(),
'emt_event': dev.EmtEvent(),
'dynamic_plot': dev.DynamicPlot(),
'dynamic_plot_entry': dev.DynamicPlotEntry(),
'fluid_node': dev.FluidNode(),
'fluid_path': dev.FluidPath(),
'fluid_turbine': dev.FluidTurbine(),
'fluid_pump': dev.FluidPump(),
'fluid_p2x': dev.FluidP2x(),
'pi_measurement': dev.PiMeasurement(),
'qi_measurement': dev.QiMeasurement(),
'pf_measurement': dev.PfMeasurement(),
'qf_measurement': dev.QfMeasurement(),
'if_measurement': dev.IfMeasurement(),
'pt_measurement': dev.PtMeasurement(),
'qt_measurement': dev.QtMeasurement(),
'it_measurement': dev.ItMeasurement(),
'vm_measurement': dev.VmMeasurement(),
'va_measurement': dev.VaMeasurement(),
'pg_measurement': dev.PgMeasurement(),
'qg_measurement': dev.QgMeasurement(),
}
_FMU_CONFIG_PROPERTY_NAMES: Tuple[str, ...] = (
"rms_fmu_import_config",
"emt_fmu_import_config",
"rms_fmu_me_import_config",
"emt_fmu_me_import_config",
"serialized_config",
)
def _to_project_relative_fmu_config(config_text: str, project_directory: Path | None) -> str:
"""
Rewrite the stored FMU path in one serialized configuration as a project-relative path.
:param config_text: Serialized FMU configuration payload.
:param project_directory: Directory containing the VeraGrid project file.
:return: Serialized configuration with a relative FMU path when possible.
"""
if project_directory is None:
return config_text
else:
stripped_text: str = config_text.strip()
if len(stripped_text) == 0:
return config_text
else:
payload: dict[str, Any] = json.loads(stripped_text)
fmu_path_text = payload.get("fmu_path", None)
if isinstance(fmu_path_text, str):
fmu_path = Path(fmu_path_text)
if fmu_path.is_absolute():
try:
payload["fmu_path"] = os.path.relpath(str(fmu_path), start=str(project_directory))
except ValueError:
payload["fmu_path"] = fmu_path_text
else:
pass
else:
pass
return json.dumps(payload, sort_keys=True)
def _resolve_project_fmu_config(config_text: str, project_directory: Path | None) -> str:
"""
Rewrite the stored FMU path in one serialized configuration as an absolute runtime path.
:param config_text: Serialized FMU configuration payload.
:param project_directory: Directory containing the VeraGrid project file.
:return: Serialized configuration with a resolved absolute FMU path when possible.
"""
if project_directory is None:
return config_text
else:
stripped_text: str = config_text.strip()
if len(stripped_text) == 0:
return config_text
else:
payload: dict[str, Any] = json.loads(stripped_text)
fmu_path_text = payload.get("fmu_path", None)
if isinstance(fmu_path_text, str):
fmu_path = Path(fmu_path_text)
if fmu_path.is_absolute():
pass
else:
payload["fmu_path"] = str((project_directory / fmu_path).resolve())
else:
pass
return json.dumps(payload, sort_keys=True)
def _resolve_circuit_fmu_paths(circuit: MultiCircuit, project_directory: Path | None) -> None:
"""
Resolve stored FMU configuration paths for every element in the circuit.
:param circuit: Parsed circuit instance.
:param project_directory: Directory containing the VeraGrid project file.
:return: None.
"""
if project_directory is None:
return
else:
elm: ALL_DEV_TYPES
for elm in circuit.get_all_elements_iter():
property_name: str
for property_name in _FMU_CONFIG_PROPERTY_NAMES:
if property_name in elm.registered_properties:
current_value = elm.get_snapshot_value_by_name(property_name)
if isinstance(current_value, str):
elm.set_snapshot_value(property_name, _resolve_project_fmu_config(current_value, project_directory))
else:
pass
else:
pass
def _to_project_relative_path(path_text: str, project_directory: Path | None) -> str:
"""
Convert one absolute filesystem path into a project-relative path when possible.
:param path_text: Original path text.
:param project_directory: Directory containing the VeraGrid project file.
:return: Relative path when conversion is possible.
"""
if project_directory is None:
return path_text
else:
normalized_text: str = str(path_text).strip()
if len(normalized_text) == 0:
return path_text
else:
candidate = Path(normalized_text)
if candidate.is_absolute():
try:
return os.path.relpath(str(candidate), start=str(project_directory))
except ValueError:
return path_text
else:
return path_text
[docs]
def order_multiverse_records(metadata: Dict[str, Dict[str | int | float]] | Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Return multiverse metadata records in a parent-before-child order.
The multiverse payloads must be parsed in dependency order because child delta payloads
may reference objects inherited from their parent composed scenario.
"""
node_metadata = get_multiverse_node_metadata(metadata)
pending_records: Dict[int, Dict[str, Any]] = {
int(node_id): dict(record) for node_id, record in node_metadata.items()
}
ordered_records: List[Dict[str, Any]] = list()
while pending_records:
progressed = False
for pending_node_id in list(pending_records.keys()):
record = pending_records[pending_node_id]
parent_id_raw = record["parent_id"]
parent_id = None if parent_id_raw is None else int(parent_id_raw)
if parent_id is not None and parent_id in pending_records:
continue
ordered_records.append(record)
del pending_records[pending_node_id]
progressed = True
if not progressed:
unresolved = ", ".join(str(node_id) for node_id in pending_records.keys())
raise ValueError(f"Could not resolve multiverse metadata dependency order for nodes: {unresolved}")
return ordered_records
[docs]
def gather_model_as_data_frames(circuit: MultiCircuit,
logger: Logger = Logger(),
legacy: bool = False) -> Dict[str, pd.DataFrame]:
"""
Pack the circuit information into tables (DataFrames)
:param circuit: MultiCircuit instance
:param logger: Logger instance
:param legacy: Generate the legacy object DataFrames
:return: dictionary of DataFrames
"""
dfs = dict()
# get the master time profile
time_profile = circuit.time_profile
nt = circuit.get_time_number()
########################################################################################################
# declare objects to iterate name: [sample object, list of objects, headers]
########################################################################################################
object_types = get_objects_dictionary()
# forget abut the Branch object when saving, now we have lines and transformers separated in their own lists
del object_types['branch']
########################################################################################################
# generic object iteration
########################################################################################################
if legacy:
# configuration ################################################################################################
obj = list()
obj.append(['BaseMVA', circuit.Sbase])
obj.append(['Version', 5])
obj.append(['Name', str(circuit.name)])
obj.append(['Comments', str(circuit.comments)])
obj.append(['idtag', str(circuit.idtag)])
# increase the model version
circuit.model_version += 1
obj.append(['ModelVersion', str(circuit.model_version)])
obj.append(['UserName', str(circuit.user_name)])
obj.append(['program', 'VeraGrid'])
dfs['config'] = pd.DataFrame(data=obj, columns=['Property', 'Value'], dtype=str)
for object_type_name, object_sample in object_types.items():
headers = object_sample.registered_properties.keys()
lists_of_objects: List[ALL_DEV_TYPES] = circuit.get_elements_by_type(object_sample.device_type)
obj = list()
profiles = dict()
object_idtags = list()
if len(lists_of_objects) > 0:
for k, elm in enumerate(lists_of_objects):
# get the object normal information
obj.append(elm.get_save_data())
object_idtags.append(elm.idtag)
if time_profile is not None:
if nt > 0:
elm.ensure_profiles_exist(time_profile)
for property_name, profile_property in object_sample.properties_with_profile.items():
# get the array
profile = elm.get_profile(magnitude=property_name)
if profile_property not in profiles.keys():
# create the profile
try:
profiles[profile_property] = np.zeros(shape=(nt, len(lists_of_objects)),
dtype=profile.dtype)
except TypeError:
logger.add_warning("Profile packed as object", device_class=str(profile.dtype))
profiles[profile_property] = np.zeros(shape=(nt, len(lists_of_objects)),
dtype=object)
# copy the object profile to the array of profiles
profiles[profile_property][:, k] = profile.toarray()
# convert the objects' list to an array
dta = np.array(obj)
else:
# declare an empty array
dta = np.zeros((0, len(headers)))
# declare the DataFrames for the normal data
dfs[object_type_name] = pd.DataFrame(data=dta, columns=list(headers))
# create the profiles' DataFrames
for prop, data in profiles.items():
dfs[object_type_name + '_' + prop] = pd.DataFrame(data=data, columns=object_idtags, index=time_profile)
# towers and wires ---------------------------------------------------------------------------------------------
# because each tower contains a reference to a number of wires, these relations need to be stored as well
associations = list()
for tower in circuit.overhead_line_types:
for wire in tower.wires_in_tower.data:
associations.append([tower.name, wire.name, wire.xpos, wire.ypos, wire.phase])
dfs['tower_wires'] = pd.DataFrame(data=associations,
columns=['tower_name', 'wire_name', 'xpos', 'ypos', 'phase'])
# Time ---------------------------------------------------------------------------------------------------------
if circuit.time_profile is not None:
if isinstance(circuit.time_profile, pd.DatetimeIndex):
time_df = pd.DataFrame(data=circuit.time_profile.values, columns=['Time'])
else:
time_df = pd.DataFrame(data=circuit.time_profile, columns=['Time'])
dfs['time'] = time_df
return dfs
[docs]
def profile_todict(profile: AnyProfile) -> Dict[str, str | bool]:
"""
Get a dictionary representation of the profile
:return:
"""
s = profile.size()
if s > 0:
if profile.is_sparse:
return {
'is_sparse': True,
'size': s,
'default': profile.sparse_array.default_value,
'sparse_data': {
'map': profile.sparse_array.get_map()
}
}
else:
return {
'is_sparse': False,
'size': s,
'default': profile.default_value,
'dense_data': profile.dense_array.tolist(),
}
else:
return {
'is_sparse': True,
'size': s,
'default': profile.default_value if profile.sparse_array is None else profile.sparse_array.default_value,
'sparse_data': {
'map': dict()
}
}
[docs]
def profile_todict_idtag(profile: AnyProfile) -> Dict[str, str]:
"""
Get a dictionary representation of the profile
:return:
"""
default = profile.default_value.idtag if hasattr(profile.default_value, 'idtag') else "None"
if profile.is_sparse:
return {
'is_sparse': profile.is_sparse,
'size': profile.size(),
'default': default,
'sparse_data': {
'map': {key: val.idtag if hasattr(val, 'idtag') else None
for key, val in profile.sparse_array.get_map().items()}
if profile.sparse_array is not None else dict()
}
}
else:
return {
'is_sparse': profile.is_sparse,
'size': profile.size(),
'default': default,
'dense_data': [e.idtag if hasattr(e, 'idtag') else None for e in profile.dense_array]
if profile.dense_array is not None else list(),
}
[docs]
def profile_todict_str(profile: AnyProfile) -> Dict[str, str]:
"""
Get a dictionary representation of the profile
:return:
"""
s = profile.size()
if s > 0:
if profile.is_sparse:
return {
'is_sparse': True,
'size': s,
'default': str(profile.default_value),
'sparse_data': {
'map': {key: str(val) for key, val in profile.sparse_array.get_map().items()}
}
}
else:
return {
'is_sparse': False,
'size': s,
'default': str(profile.default_value),
'dense_data': [str(e) for e in profile.dense_array],
}
else:
# empty profile
return {
'is_sparse': True,
'size': s,
'default': str(profile.default_value),
'sparse_data': {
'map': dict()
}
}
[docs]
def cast_profile_value(
profile: AnyProfile,
value: Any,
collection: Union[None, Dict[str, Any]] = None,
) -> Any:
"""
Convert serialized profile payloads into the concrete type required by ``profile``.
:param profile: Typed profile that will receive the value.
:param value: Serialized raw value.
:param collection: Optional idtag-to-object dictionary for device references.
:return: Converted value.
"""
if value is None or value == "None":
return None
if collection is not None:
return collection.get(value, None)
if isinstance(profile.dtype, DeviceType):
return value
if isinstance(profile.dtype, EnumType):
return profile.dtype(value)
if profile.dtype == bool:
if isinstance(value, str):
low = value.strip().lower()
if low in {"true", "1"}:
return True
if low in {"false", "0"}:
return False
return bool(value)
if profile.dtype == int:
return int(value)
if profile.dtype == float:
return float(value)
if profile.dtype == str:
return str(value)
return value
[docs]
def get_profile_from_dict(profile: AnyProfile,
data: Dict[str, Any | Dict[str, Any]],
collection: Union[None, Dict[str, Any]] = None):
"""
Create a profile from json dict data
:param profile: Profile object to fill in
:param data: Json dict data
:param collection: if the collection is provided, it will be used to convert idtags into objects
:return: None
"""
raw_default_value = data['default']
is_sparse = bool(data['is_sparse'])
try:
default_value: Any = cast_profile_value(profile=profile, value=raw_default_value, collection=collection)
except (TypeError, ValueError):
default_value = profile.default_value
if isinstance(profile.dtype, DeviceType) and default_value == "None":
default_value = profile.default_value
if is_sparse:
sp_data = data['sparse_data']
if collection is None:
map_data = {
int(key): cast_profile_value(profile=profile, value=val, collection=collection)
for key, val in sp_data['map'].items()
}
else:
map_data = {
int(key): cast_profile_value(profile=profile, value=val, collection=collection)
for key, val in sp_data['map'].items()
}
profile.create_sparse(default_value=default_value,
size=data['size'], map_data=map_data)
else:
if collection is None:
arr = [cast_profile_value(profile=profile, value=i, collection=collection) for i in data['dense_data']]
else:
arr = [cast_profile_value(profile=profile, value=i, collection=collection) for i in data['dense_data']]
profile.set(np.array(arr))
# mark as initialized
profile.set_initialized()
[docs]
def veragrid_object_to_json(elm: ALL_DEV_TYPES,
block_saver: BlockSaver,
project_directory: Path | None = None) -> Dict[str, str]:
"""
:param elm:
:param block_saver:
:return:
"""
data = dict()
for name, prop in elm.registered_properties.items():
obj = elm.get_snapshot_value(prop=prop)
if prop.tpe in [str, float, int, bool]:
if name in _FMU_CONFIG_PROPERTY_NAMES and isinstance(obj, str):
data[name] = _to_project_relative_fmu_config(obj, project_directory)
else:
if name == "fmu_relative_path" and isinstance(obj, str):
data[name] = _to_project_relative_path(obj, project_directory)
else:
data[name] = obj
if prop.has_profile():
data[name + '_prof'] = profile_todict(elm.get_profile_by_prop(prop=prop))
elif prop.tpe == SubObjectType.MergeInformation:
data[name] = obj.to_dict()
elif prop.tpe == SubObjectType.GeneratorQCurve:
data[name] = obj.to_list()
elif prop.tpe == SubObjectType.LineLocations:
data[name] = obj.to_list()
elif prop.tpe == SubObjectType.ListOfWires:
data[name] = obj.to_list()
elif prop.tpe == SubObjectType.TapChanger:
data[name] = obj.to_dict()
elif prop.tpe == SubObjectType.Associations:
data[name] = obj.to_dict()
elif prop.tpe == SubObjectType.AdmittanceMatrix:
data[name] = obj.to_dict()
elif prop.tpe == SubObjectType.DaeBlockType:
block_saver.save_block(blk=obj, main=True)
if isinstance(obj, Block):
data[name] = obj.uid
else:
raise ValueError("must be a block")
elif prop.tpe == SubObjectType.VarType:
if obj is not None:
data[name] = obj.uid
else:
# Persistent dynamic plot entries may keep unresolved legacy Var
# hints as ``None`` while semantic fields remain the canonical
# binding identity.
data[name] = None
elif prop.tpe == SubObjectType.ConstType:
if obj is not None:
data[name] = obj.uid
else:
data[name] = None
elif prop.tpe == SubObjectType.Array:
data[name] = list(obj)
else:
# if the object is not of a primary type, get the idtag instead
if obj is None:
data[name] = None
else:
if isinstance(obj, EditableDevice):
data[name] = obj.idtag
if prop.has_profile():
data[name + '_prof'] = profile_todict_idtag(elm.get_profile_by_prop(prop=prop))
else:
pass
else:
# some data types might not have the idtag, ten just use the str method
data[name] = str(obj)
if prop.has_profile():
data[name + '_prof'] = profile_todict_str(elm.get_profile_by_prop(prop=prop))
else:
pass
return data
[docs]
def gather_model_as_jsons(circuit: MultiCircuit,
project_directory: Path | None = None) -> dict[
str, dict[str, dict[str, str] | list[dict[str, str]]] | dict[
str, list[dict[str, Any]] | dict[int, list[Any]] | dict[int, dict[str, Any]] | list[int]]]:
"""
Transform a MultiCircuit into a collection of Json files
:param circuit:
:return:
"""
if circuit.has_time_series:
circuit.ensure_profiles_exist()
data: Dict[str, Union[Dict[str, str], List[Dict[str, str]]]] = dict()
block_saver = BlockSaver(circuit.var_factory)
# declare objects to iterate name: [sample object, list of objects, headers]
object_types = get_objects_dictionary()
del object_types['branch']
# generic object iteration
for object_type_name, object_sample in object_types.items():
object_json = list()
lists_of_objects = circuit.get_elements_by_type(object_sample.device_type)
if len(lists_of_objects) > 0:
for k, elm in enumerate(lists_of_objects):
obj_data = veragrid_object_to_json(elm, block_saver=block_saver, project_directory=project_directory)
object_json.append(obj_data)
data[object_type_name] = object_json
# time
unix_time = circuit.get_unix_time()
data['time'] = {'unix': unix_time.tolist(),
'prob': list(np.ones(len(unix_time))),
'snapshot_unix': circuit.get_snapshot_time_unix()}
# gather the circuit
circuit.model_version += 1
data['circuit'] = circuit.to_dict()
# At this point I already have the symbolic data stored in block_saver
dictionary_save = {
"model_data": data,
"symbolic_data": {
"vars": block_saver.get_vars_to_save(),
"consts": block_saver.get_const_to_save(),
"diff_vars": block_saver.get_diff_vars_to_save(),
"shared_references": block_saver.get_shared_references_to_save(),
"blocks": block_saver.get_blocks(),
"main_block_uids": block_saver.main_block_uids,
}
}
return {
"model_data": data,
"symbolic_data": {
"vars": block_saver.get_vars_to_save(),
"consts": block_saver.get_const_to_save(),
"diff_vars": block_saver.get_diff_vars_to_save(),
"shared_references": block_saver.get_shared_references_to_save(),
"connections": block_saver.get_connections_to_save(),
"blocks": block_saver.get_blocks(),
"main_block_uids": block_saver.main_block_uids,
}
}
[docs]
def search_property(template_elm: ALL_DEV_TYPES,
old_props_dict: Dict[str, str],
property_to_search: str,
logger: Logger) -> Union[GCProp, None]:
"""
Search for a property name in the template object registered properties and their old names
:param template_elm: Device to loo into
:param old_props_dict: Dictionary matching the old names with their current counterpart
:param property_to_search: property name to search
:param logger: Logger
:return: GCProp or None if not found
"""
# search the property in the object headers
gc_prop = template_elm.registered_properties.get(property_to_search, None)
if gc_prop is None:
# the property is not in the headers, search in the the old list
current_prop_name = old_props_dict.get(property_to_search, None)
if current_prop_name:
logger.add_info('The file property was updated in the data model',
device=str(template_elm.device_type),
value=property_to_search)
gc_prop = template_elm.registered_properties.get(current_prop_name, None)
return gc_prop
else:
# the property does not exists in the registries, this is a bug
logger.add_error('the property does not exists in the registries',
device=str(template_elm.device_type),
value=property_to_search)
return None
else:
return gc_prop
[docs]
def look_for_property(elm: ALL_DEV_TYPES, property_name) -> Union[GCProp, None]:
"""
:param elm:
:param property_name:
:return:
"""
device_property_definition: GCProp = elm.registered_properties.get(property_name, None)
if device_property_definition:
# the property of the file exists directly
return device_property_definition
else:
# the property does not exists directly, look in the older properties
for name, prop in elm.registered_properties.items():
if property_name in prop.old_names:
return prop
return None # if we reach here, it wasn't found
[docs]
def valid_value(val) -> bool:
"""
:param val:
:return:
"""
if isinstance(val, str):
if val == 'nan':
return False
if val == '':
return False
if val == 'None':
return False
if isinstance(val, float):
if math.isnan(val):
return False
if math.isinf(val):
return False
return True
[docs]
def look_in_collection_by_name(key: str, collection: Dict[str, ALL_DEV_TYPES]) -> Union[ALL_DEV_TYPES, None]:
"""
Look in a collection for an element by its name instead of by Idtag
:param key: name of the element
:param collection: Collection to look into
:return: Device or None if not found
"""
for idtag, elm in collection.items():
if elm.name == key:
return elm
return None
[docs]
class CreatedOnTheFly:
"""
This class is to pack all those devices that are created "on the fly" to support legacy formats
"""
def __init__(self) -> None:
"""
Constructor
"""
# legacy operations: this is from when area, zone and substation were strings,
# now we create those objects on the fly
self.legacy_area_dict: Dict[str, dev.Area] = dict()
self.legacy_zone_dict: Dict[str, dev.Zone] = dict()
self.legacy_substation_dict: Dict[str, dev.Substation] = dict()
self.contingency_groups: List[dev.ContingencyGroup] = list()
self.contingencies: List[dev.Contingency] = list()
self.technologies: Dict[str, dev.Technology] = dict()
[docs]
def get_create_area(self, property_value):
"""
:param property_value:
:return:
"""
area = self.legacy_area_dict.get(property_value, None)
if area is None:
area = dev.Area(name=str(property_value))
self.legacy_area_dict[property_value] = area
return area
[docs]
def get_create_zone(self, property_value):
"""
:param property_value:
:return:
"""
zone = self.legacy_zone_dict.get(property_value, None)
if zone is None:
zone = dev.Zone(name=str(property_value))
self.legacy_zone_dict[property_value] = zone
return zone
[docs]
def get_create_substation(self, property_value):
"""
:param property_value:
:return:
"""
substation = self.legacy_substation_dict.get(property_value, None)
if substation is None:
substation = dev.Substation(name=str(property_value))
self.legacy_substation_dict[property_value] = substation
return substation
[docs]
def create_contingency(self, elm: ALL_DEV_TYPES):
"""
:param elm:
:return:
"""
con_group = dev.ContingencyGroup(name=elm.name)
conn = dev.Contingency(device=elm, prop=ContingencyOperationTypes.Active, group=con_group)
self.contingency_groups.append(con_group)
self.contingencies.append(conn)
[docs]
def create_technology(self, elm: dev.Generator, tech_name: str):
"""
:param elm:
:param tech_name:
:return:
"""
tech = self.technologies.get(tech_name, None)
if tech is None:
tech = dev.Technology(name=tech_name)
self.technologies[tech_name] = tech
elm.technologies.add_object(api_object=tech, val=1.0)
[docs]
def parse_object_type_from_dataframe(
main_df: pd.DataFrame,
template_elm: ALL_DEV_TYPES,
elements_dict_by_type: Dict[DeviceType, Dict[str, ALL_DEV_TYPES]],
time_profile: pd.DatetimeIndex,
object_type_key: str,
data: Dict[str, Union[float, str, pd.DataFrame]],
logger: Logger) -> Tuple[List[ALL_DEV_TYPES], Dict[str, ALL_DEV_TYPES], CreatedOnTheFly]:
"""
Convert a DataFrame to a list of VeraGrid devices
:param main_df: DataFrame to convert
:param template_elm: Element to use as template for conversion
:param elements_dict_by_type: Dictionary of devices grouped by type used to look for referenced objects
elements_dict_by_type[DeviceType][idtag] -> device
:param time_profile: Master time profile
:param object_type_key: Object type naming to find the profile
:param data: Complete data collection to find the profiles
:param logger: Logger instance
:return: devices, devices_dict
"""
# dictionary to be filled with this type of objects
devices_dict: Dict[str, ALL_DEV_TYPES] = dict()
devices: List[ALL_DEV_TYPES] = list()
# legacy operations: this is from when area, zone and substation were strings,
# now we create those objects on the fly
on_the_fly = CreatedOnTheFly()
# parse each object of the dataframe
for i, row in main_df.iterrows():
# create device
idtag = row.get('idtag', None)
elm = type(template_elm)(idtag=idtag)
elm.disable_auto_updates()
# ensure the profiles existence
if time_profile is None or time_profile is pd.NaT:
nt = 0
else:
nt = len(time_profile)
if nt > 0:
elm.ensure_profiles_exist(index=time_profile)
# parse each property of the row
for property_name_, property_value in row.items():
property_name = str(property_name_)
if property_name != 'idtag': # idtag was set already
gc_prop: GCProp = look_for_property(elm=elm, property_name=property_name)
if gc_prop is not None:
if valid_value(property_value):
if gc_prop.has_profile():
prof = elm.get_profile(magnitude=gc_prop.name)
if 0 < nt != prof.size():
prof.resize(nt)
else:
prof = None
# the property of the file exists, parse it
if isinstance(gc_prop.tpe, DeviceType):
# we must look for the reference in elements_dict
collection = elements_dict_by_type.get(gc_prop.tpe, None)
if collection is not None:
ref_idtag = str(property_value)
ref_elm = collection.get(ref_idtag, None)
if ref_elm is not None:
elm.set_snapshot_value(gc_prop.name, ref_elm)
if gc_prop.has_profile():
prof.fill(ref_elm)
else:
# legacy operations: this is from when grids referenced buses by name
if gc_prop.name in ['bus_from', 'bus_to', 'bus']:
ref_elm = look_in_collection_by_name(key=ref_idtag, collection=collection)
if ref_elm is None:
could_not_fix_it = True
else:
could_not_fix_it = False
elm.set_snapshot_value(gc_prop.name, ref_elm)
if gc_prop.has_profile():
prof.fill(ref_elm)
else:
could_not_fix_it = True
if could_not_fix_it:
logger.add_error("Could not locate reference",
device=row.get('idtag', 'not provided'),
device_class=template_elm.device_type.value,
device_property=gc_prop.name,
value=ref_idtag)
else:
# legacy operations: this is from when area, zone and substation were strings
if gc_prop.name == 'area':
if str(property_value).strip() != '':
area = on_the_fly.get_create_area(property_value=str(property_value))
elm.set_snapshot_value(gc_prop.name, area)
elif gc_prop.name == 'zone':
if str(property_value).strip() != '':
zone = on_the_fly.get_create_zone(property_value=str(property_value))
elm.set_snapshot_value(gc_prop.name, zone)
elif gc_prop.name == 'substation':
if str(property_value).strip() != '':
substation = on_the_fly.get_create_substation(
property_value=str(property_value))
elm.set_snapshot_value(gc_prop.name, substation)
elif gc_prop.name == 'template' and property_value == 'BranchTemplate':
# skip this
pass
else:
logger.add_error("No device of the referenced type",
device=row.get('idtag', 'not provided'),
device_class=template_elm.device_type.value,
device_property=gc_prop.name,
value=property_value)
elif isinstance(gc_prop.tpe, SubObjectType):
if gc_prop.tpe == SubObjectType.GeneratorQCurve:
q_curve: dev.GeneratorQCurve = elm.get_snapshot_value(gc_prop)
if isinstance(property_value, str):
q_curve.parse(json.loads(property_value))
else:
q_curve.parse(property_value)
elif gc_prop.tpe == str:
# set the value directly
elm.set_snapshot_value(gc_prop.name, str(property_value))
if gc_prop.has_profile():
prof.fill(str(property_value))
elif gc_prop.tpe == float:
# set the value directly
elm.set_snapshot_value(gc_prop.name, float(property_value))
if gc_prop.has_profile():
prof.fill(float(property_value))
elif gc_prop.tpe == int:
# set the value directly
elm.set_snapshot_value(gc_prop.name, int(property_value))
if gc_prop.has_profile():
prof.fill(int(property_value))
elif gc_prop.tpe == bool:
# set the value directly
elm.set_snapshot_value(gc_prop.name, bool(property_value))
if gc_prop.has_profile():
prof.fill(bool(property_value))
elif isinstance(gc_prop.tpe, EnumType):
try:
val = gc_prop.tpe(property_value)
elm.set_snapshot_value(gc_prop.name, val)
if gc_prop.has_profile():
prof.fill(val)
except ValueError:
logger.add_error(f'Cannot cast value to {gc_prop.tpe}',
device=elm.name,
value=property_value)
else:
raise Exception(f'Unsupported property type: {gc_prop.tpe}')
else:
# invalid property value
pass
# search the profiles in the data and assign them
if gc_prop.has_profile() and time_profile is not None:
# build the profile property file-name to get it from the data
profile_key = object_type_key + '_' + gc_prop.profile_name
# get the profile DataFrame
dfp = data.get(profile_key, None)
if dfp is not None:
elm.set_profile(gc_prop, arr=dfp.values[:, i].astype(gc_prop.tpe))
else:
skip = False
if gc_prop.name == 'bus':
skip = True
if not skip:
logger.add_info(msg='No profile for the property', value=gc_prop.name)
else:
# the property does not exists, neither in the old names
skip = False
if template_elm.device_type == DeviceType.ShuntDevice:
if property_name in ['is_controlled', 'Bmin', 'Bmax', 'Vset']:
skip = True
if template_elm.device_type == DeviceType.Transformer2WDevice:
if property_name == 'control_mode':
if "Pf" in property_value:
elm.tap_phase_control_mode = TapPhaseControl.Pf
skip = True
if "Pt" in property_value:
elm.tap_phase_control_mode = TapPhaseControl.Pt
skip = True
if "V" in property_value:
elm.tap_module_control_mode = TapModuleControl.Vm
elm.regulation_bus = elm.bus_to
skip = True
if "Qf" in property_value:
elm.tap_module_control_mode = TapModuleControl.Qf
skip = True
if "Qt" in property_value:
elm.tap_module_control_mode = TapModuleControl.Qt
skip = True
if "fixed" in property_value:
elm.tap_module_control_mode = TapModuleControl.fixed
elm.tap_phase_control_mode = TapPhaseControl.fixed
skip = True
if property_name == 'contingency_enabled':
# this is a branch with the legacy property "contingency_enabled", hence, create a contingency
on_the_fly.create_contingency(elm=elm)
skip = True
elif property_name == 'technology':
on_the_fly.create_technology(elm=elm, tech_name=property_value)
skip = True
if not skip:
logger.add_warning("Property in the file is not found in the model",
device=row.get('idtag', 'not provided'),
device_class=template_elm.device_type.value,
device_property=property_name)
# save the element in the dictionary for later
devices_dict[elm.idtag] = elm
devices.append(elm)
return devices, devices_dict, on_the_fly
[docs]
def search_property_into_json(json_entry: dict, prop: GCProp):
"""
Find property in Json entry
:param json_entry: json of an object
:param prop: GCProp
:return: value or None if not found
"""
# search for the main property
property_value = json_entry.get(prop.name, None)
if property_value is None:
# if not found, search for an old property
for p_name in prop.old_names:
property_value = json_entry.get(p_name, None)
if property_value is not None:
return property_value
# we couldn't find the property or the old names...
return None
else:
# we found the property at first
return property_value
[docs]
def search_and_apply_json_profile(json_entry: Dict[str, Dict[str, Union[str, Union[Any, Dict[str, Any]]]]],
gc_prop: GCProp,
elm: ALL_DEV_TYPES,
property_value: Any,
collection: Union[None, Dict[str, Any]] = None) -> None:
"""
Search from the property profiles into the json and apply it
:param json_entry: Json entry of an object
:param gc_prop: GCProp
:param elm: THe device to set the profile into
:param property_value: The snapshot value
:param collection: if the collection is provided, it will be used to convert idtags into objects
:return: None
"""
if gc_prop.has_profile():
# search the profile in the json
json_profile = json_entry.get(gc_prop.profile_name, None)
profile: AnyProfile = elm.get_profile(magnitude=gc_prop.name)
if json_profile is None:
# the profile was not found, so we fill it with the default stuff
profile.fill(property_value)
else:
get_profile_from_dict(profile=profile,
data=json_profile,
collection=collection)
[docs]
def parse_object_type_from_json(template_elm: ALL_DEV_TYPES,
data_list: List[Dict[str, Dict[str, str]]],
elements_dict_by_type: Dict[DeviceType, Dict[str, ALL_DEV_TYPES]],
time_profile: pd.DatetimeIndex,
block_parser: BlockParser,
logger: Logger):
"""
:param template_elm:
:param data_list:
:param elements_dict_by_type:
:param time_profile:
:param block_parser:
:param logger:
:return:
"""
# dictionary to be filled with this type of objects
devices_dict: Dict[str, ALL_DEV_TYPES] = dict()
devices: List[ALL_DEV_TYPES] = list()
for json_entry in data_list:
idtag = json_entry['idtag']
elm: ALL_DEV_TYPES = type(template_elm)(idtag=idtag)
elm.disable_auto_updates()
# ensure the profiles existence
if time_profile is not None:
elm.ensure_profiles_exist(index=time_profile)
# for property_name_, property_value in json_entry.items():
for property_name, gc_prop in template_elm.registered_properties.items():
# search for the property in the json
property_value = search_property_into_json(json_entry=json_entry, prop=gc_prop)
if property_value is not None:
if property_name != 'idtag': # idtag was set already
# gc_prop: GCProp = look_for_property(elm=elm, property_name=property_name)
if gc_prop is not None:
if valid_value(property_value):
if isinstance(gc_prop.tpe, DeviceType):
if gc_prop.tpe == DeviceType.AnyLineTemplateDevice:
# this is an exception, when dealing with line templates we need to look for
# several types, not only one
seq_templates = elements_dict_by_type.get(DeviceType.OverheadLineTypeDevice, dict())
oh_templates = elements_dict_by_type.get(DeviceType.SequenceLineDevice, dict())
ug_templates = elements_dict_by_type.get(DeviceType.UnderGroundLineDevice, dict())
collection = {**seq_templates, **oh_templates, **ug_templates}
elif gc_prop.tpe == DeviceType.BusOrBranch:
bus_dic = elements_dict_by_type.get(DeviceType.BusDevice, None)
lines_dict = elements_dict_by_type.get(DeviceType.LineDevice, None)
if bus_dic is not None and lines_dict is not None:
collection = bus_dic | lines_dict
elif bus_dic is not None and lines_dict is None:
collection = bus_dic
elif bus_dic is None and lines_dict is not None:
collection = bus_dic
else:
collection = None
else:
# this is a hyperlink to another object
# we must look for the reference in elements_dict
collection = elements_dict_by_type.get(gc_prop.tpe, None)
if collection is not None:
ref_idtag = str(property_value)
ref_elm = collection.get(ref_idtag, None)
if ref_elm is not None:
elm.set_snapshot_value(gc_prop.name, ref_elm)
search_and_apply_json_profile(json_entry=json_entry,
gc_prop=gc_prop,
elm=elm,
property_value=ref_elm,
collection=collection)
else:
logger.add_error("Could not locate reference",
device=elm.idtag,
device_class=template_elm.device_type.value,
device_property=gc_prop.name,
value=ref_idtag)
else:
logger.add_error("No device of the referenced type",
device=elm.idtag,
device_class=template_elm.device_type.value,
device_property=gc_prop.name,
value=property_value)
elif isinstance(gc_prop.tpe, SubObjectType): # this is a hyperlink to another object
if gc_prop.tpe == SubObjectType.GeneratorQCurve:
# get the curve object and fill it with the json data
q_curve: dev.GeneratorQCurve = elm.get_snapshot_value(prop=gc_prop)
if isinstance(property_value, str):
q_curve.parse(json.loads(property_value))
else:
q_curve.parse(data=property_value)
elif gc_prop.tpe == SubObjectType.LineLocations:
# get the line locations object and fill it with the json data
locations_obj: dev.LineLocations = elm.get_snapshot_value(prop=gc_prop)
locations_obj.parse(property_value)
elif gc_prop.tpe == SubObjectType.ListOfWires:
# get the line locations object and fill it with the json data
list_of_wires: dev.ListOfWires = elm.get_snapshot_value(prop=gc_prop)
list_of_wires.parse(data=property_value,
wire_dict=elements_dict_by_type[DeviceType.WireDevice])
elif gc_prop.tpe == SubObjectType.TapChanger:
# get the line locations object and fill it with the json data
tc_obj: dev.TapChanger = elm.get_snapshot_value(prop=gc_prop)
tc_obj.parse(property_value, logger=logger)
elif gc_prop.tpe == SubObjectType.Array:
val = np.array(property_value)
elm.set_snapshot_value(gc_prop.name, val)
elif gc_prop.tpe == SubObjectType.AdmittanceMatrix:
# get the line locations object and fill it with the json data
adm_mat: SubObjectType.AdmittanceMatrix = elm.get_snapshot_value(prop=gc_prop)
adm_mat.parse(property_value)
elif gc_prop.tpe == SubObjectType.DaeBlockType:
# handle the ModelHost
if isinstance(property_value, dict):
key = property_value.get("model", None)
if key is None:
logger.add_error("Could not locate model", value=property_value)
else:
blk = block_parser.block_dict.get(key, None)
elm.set_snapshot_value(gc_prop.name, blk)
else:
blk = block_parser.block_dict.get(property_value, None)
if blk is None:
logger.add_error("Block not found",
device=elm.name,
value=property_value,
device_class=elm.device_type.value,
device_property=gc_prop.name
)
else:
elm.set_snapshot_value(gc_prop.name, blk)
elif gc_prop.tpe == SubObjectType.VarType:
# set the value of the property
vl = block_parser.var_factory.get_var(property_value)
elm.set_snapshot_value(gc_prop.name, vl)
elif gc_prop.tpe == SubObjectType.ConstType:
# set the value of the property
vl = block_parser.var_factory.get_const(property_value)
elm.set_snapshot_value(gc_prop.name, vl)
elif gc_prop.tpe == SubObjectType.Associations:
# get the list of associations
associations = elm.get_snapshot_value(gc_prop)
associations.parse(
data=property_value,
elements_dict=elements_dict_by_type.get(associations.device_type, {}),
logger=logger,
elm_name=elm.name
)
elif gc_prop.tpe == SubObjectType.MergeInformation:
# set the value directly
elm.diff_changes.parse(property_value)
else:
raise Exception(f"SubObjectType {gc_prop.tpe} not implemented")
elif gc_prop.tpe == str:
# set the value directly
val = str(property_value)
elm.set_snapshot_value(gc_prop.name, val)
search_and_apply_json_profile(json_entry=json_entry,
gc_prop=gc_prop,
elm=elm,
property_value=val)
elif gc_prop.tpe == float:
# set the value directly
val = float(property_value)
elm.set_snapshot_value(gc_prop.name, val)
search_and_apply_json_profile(json_entry=json_entry,
gc_prop=gc_prop,
elm=elm,
property_value=val)
elif gc_prop.tpe == int:
# set the value directly
val = int(property_value)
elm.set_snapshot_value(gc_prop.name, val)
search_and_apply_json_profile(json_entry=json_entry,
gc_prop=gc_prop,
elm=elm,
property_value=val)
elif gc_prop.tpe == bool:
# set the value directly
val = bool(property_value)
elm.set_snapshot_value(gc_prop.name, val)
search_and_apply_json_profile(json_entry=json_entry,
gc_prop=gc_prop,
elm=elm,
property_value=val)
elif isinstance(gc_prop.tpe, EnumType):
try:
val = gc_prop.tpe(property_value)
try:
elm.set_snapshot_value(gc_prop.name, val)
except ValueError as e:
logger.add_error(f'Cannot set the snapshot',
device=elm.name,
value=property_value,
comment=str(e))
try:
search_and_apply_json_profile(json_entry=json_entry,
gc_prop=gc_prop,
elm=elm,
property_value=val)
except ValueError as e:
logger.add_error(f'Cannot set the profile',
device=elm.name,
value=property_value,
comment=str(e))
except ValueError as e:
logger.add_error(f'Cannot cast the value to the snapshot',
device=elm.name,
value=property_value,
comment=str(e))
else:
raise Exception(f'Unsupported property type: {gc_prop.tpe} for {gc_prop.name}')
else:
# invalid property value
pass
else:
# property not found
pass
else:
# the property is idtag
pass
else:
# the object property was not found in the json entry
pass
if template_elm.device_type == DeviceType.Transformer3WDevice:
for property_name in ('r12', 'r23', 'r31', 'x12', 'x23', 'x31', 'rate1', 'rate2', 'rate3'):
gc_prop = template_elm.registered_properties[property_name]
property_value = search_property_into_json(json_entry=json_entry, prop=gc_prop)
if property_value is not None:
elm.set_snapshot_value(gc_prop.name, float(property_value))
if template_elm.device_type == DeviceType.LineDevice:
gc_prop = template_elm.registered_properties.get('length', None)
if gc_prop is not None:
property_value = search_property_into_json(json_entry=json_entry, prop=gc_prop)
if property_value is not None and float(property_value) == 0.0:
elm._length = 0.0
# save the element in the dictionary for later
devices_dict[elm.idtag] = elm
devices.append(elm)
return devices, devices_dict
[docs]
def handle_legacy_jsons(model_data: Dict[str, List],
elements_dict_by_type: Dict[DeviceType, Dict],
logger: Logger) -> None:
"""
Handle those legacy structures that were deprecated and removed from VeraGrid's structure
:param model_data:
:param elements_dict_by_type:
:param logger:
:return:
"""
gt_data_list = model_data.get("generator_technology", None)
if gt_data_list is not None:
for entry in gt_data_list:
gen_idtag = entry.get('generator', None)
tech_idtag = entry.get('technology', None)
proportion = entry.get('proportion', 1.0)
generator = elements_dict_by_type[DeviceType.GeneratorDevice].get(gen_idtag, None)
tech = elements_dict_by_type[DeviceType.Technology].get(tech_idtag, None)
if generator is not None and tech is not None:
generator.technologies.add_object(api_object=tech, val=proportion)
logger.add_info("Converted legacy generator technology association",
device_class="Generator_technology",
value=f"{generator.name} -> {tech.name} at {proportion}")
gf_data_list = model_data.get("generator_fuel", None)
if gf_data_list is not None:
for entry in gf_data_list:
gen_idtag = entry.get('generator', None)
fuel_idtag = entry.get('fuel', None)
rate = entry.get('rate', 1.0)
generator = elements_dict_by_type[DeviceType.GeneratorDevice].get(gen_idtag, None)
fuel = elements_dict_by_type[DeviceType.FuelDevice].get(fuel_idtag, None)
if generator is not None and fuel is not None:
generator.fuels.add_object(api_object=fuel, val=rate)
logger.add_info("Converted legacy generator fuel association",
device_class="generator_fuel",
value=f"{generator.name} -> {fuel.name} at {rate}")
ge_data_list = model_data.get("generator_emission", None)
if ge_data_list is not None:
for entry in ge_data_list:
gen_idtag = entry.get('generator', None)
emision_idtag = entry.get('emission', None)
rate = entry.get('rate', 1.0)
generator = elements_dict_by_type[DeviceType.GeneratorDevice].get(gen_idtag, None)
emission = elements_dict_by_type[DeviceType.EmissionGasDevice].get(emision_idtag, None)
if generator is not None and emission is not None:
generator.emissions.add_object(api_object=emission, val=rate)
logger.add_info("Converted legacy generator emission association",
device_class="generator_emission",
value=f"{generator.name} -> {emission.name} at {rate}")
[docs]
def parse_veragrid_data(data: VERAGRID_FILE_TYPE,
previous_circuit: Union[MultiCircuit, None] = None,
project_directory: str | Path | None = None,
refine_pointers: bool = True,
text_func: Union[Callable, None] = None,
progress_func: Union[Callable, None] = None,
logger: Logger = Logger()) -> MultiCircuit:
"""
Interpret data
:param project_directory: file project directory
:param refine_pointers: Refine (and possibly delete) the pointer objects such as investments and contingencies?
:param data: dictionary of data frames and other information
:param previous_circuit: Optional previous VeraGrid circuit. This is relevant in case of loading grid increments
:param text_func: text callback function
:param progress_func: progress callback function
:param logger: Logger to register events
:return: MultiCircuit instance
"""
# create circuit
circuit = MultiCircuit()
# Legacy circuit information parsing -------------------------------------------------------------------------------
if 'name' in data.keys():
circuit.name = str(data['name'])
if circuit.name == 'nan':
circuit.name = ''
if 'idtag' in data.keys():
val = str(data['idtag'])
if val != '':
circuit.idtag = val
else:
logger.add_warning("Had to create a new idtag", value=circuit.idtag)
# set the base magnitudes
if 'baseMVA' in data.keys():
circuit.Sbase = data['baseMVA']
# Set comments
if 'Comments' in data.keys():
circuit.comments = str(data['Comments'])
if circuit.comments == 'nan':
circuit.comments = ''
if 'ModelVersion' in data.keys():
circuit.model_version = int(data['ModelVersion'])
if 'UserName' in data.keys():
circuit.user_name = data['UserName']
# dictionary of objects to iterate
template_object_types = get_objects_dictionary()
# time profile -----------------------------------------------------------------------------------------------------
if 'time' in data.keys():
time_df = data['time']
try:
circuit.time_profile = pd.to_datetime(time_df.values[:, 0], dayfirst=True, format='mixed')
except ValueError as err:
circuit.time_profile = pd.to_datetime(time_df.values[:, 0], dayfirst=True)
else:
circuit.time_profile = None
# dictionary of dictionaries by element type
if previous_circuit is None:
elements_dict_by_type = dict()
else:
elements_dict_by_type = previous_circuit.get_all_elements_dict_by_type(add_locations=True, string_keys=False)
# ------------------------------------------------------------------------------------------------------------------
# Legacy DataFrame processing
# for each element type...
item_count = 0
n_data_types = len(template_object_types)
for object_type_key, template_elm in template_object_types.items():
if text_func is not None:
text_func(f"Parsing {object_type_key} table data...")
# try to get the DataFrame
# Todo: here we should get the data from template_object_types dict, not from data dict
df = data.get(object_type_key, None)
if df is not None:
# fill in the objects
if df.shape[0] > 0:
devices, devices_dict, on_the_fly = parse_object_type_from_dataframe(
main_df=df,
template_elm=template_elm,
elements_dict_by_type=elements_dict_by_type,
time_profile=circuit.time_profile,
object_type_key=object_type_key,
data=data,
logger=logger
)
# add the elements that were created on the fly...
for name, on_the_fly_elm in on_the_fly.legacy_area_dict.items():
circuit.add_area(obj=on_the_fly_elm)
for name, on_the_fly_elm in on_the_fly.legacy_zone_dict.items():
circuit.add_zone(obj=on_the_fly_elm)
for name, on_the_fly_elm in on_the_fly.legacy_substation_dict.items():
circuit.add_substation(obj=on_the_fly_elm)
for conn_group in on_the_fly.contingency_groups:
circuit.add_contingency_group(obj=conn_group)
for cont in on_the_fly.contingencies:
circuit.add_contingency(obj=cont)
for tech_name, technology in on_the_fly.technologies.items():
circuit.add_technology(obj=technology)
# set/augment the dictionary per type for later
prev_dict = elements_dict_by_type.get(template_elm.device_type, dict())
elements_dict_by_type[template_elm.device_type] = dict(prev_dict, **devices_dict)
# add the devices to the circuit
circuit.set_elements_list_by_type(device_type=template_elm.device_type,
devices=devices,
logger=logger)
else:
# no objects of this type
pass
else:
# the file does not contain information for the data type (not a problem...)
pass
if progress_func is not None:
progress_func(float(item_count + 1) / float(n_data_types) * 100)
item_count += 1
# ------------------------------------------------------------------------------------------------------------------
# New way of parsing information from .model files (Json files)
# These files are just .json stored in the model_data inside the zip file
block_parser = BlockParser(circuit.var_factory)
symbolic_data = data.get('symbolic_data', None)
if symbolic_data is not None:
if len(symbolic_data) > 0:
if "shared_references" in symbolic_data:
block_parser.parse_references(symbolic_data["shared_references"])
block_parser.parse_consts(symbolic_data["consts"])
block_parser.parse_vars(symbolic_data["vars"])
block_parser.parse_diff_vars(symbolic_data["diff_vars"])
if "connections" in symbolic_data:
block_parser.parse_connections(symbolic_data["connections"])
for block_uid in symbolic_data["main_block_uids"]:
block_parser.parse_block(symbolic_data["blocks"], block_uid)
else:
pass # the symbolic data is empty
else:
pass # there is no symbolic data records
model_data = data.get('model_data', None)
if model_data is not None:
if len(model_data) > 0:
# parse circuit own data
circuit_data: Dict[str, str | int | float] | None = model_data.get('circuit', None)
if circuit_data is not None:
circuit.parse(data=circuit_data)
# parse time
tdata = model_data.get('time', None)
if tdata is not None:
circuit.set_unix_time(arr=tdata['unix'])
snapshot_unix_time = tdata.get('snapshot_unix', None)
if snapshot_unix_time is not None:
circuit.set_snapshot_time_unix(val=snapshot_unix_time)
else:
logger.add_error(msg=f'The file must have time data regardless of the profiles existence')
circuit.time_profile = None
# for each element type...
item_count = 0
n_data_types = len(template_object_types)
for object_type_key, template_elm in template_object_types.items():
if text_func is not None:
text_func(f"Parsing {object_type_key} model data...")
# query the device type into the data set
data_list = model_data.get(object_type_key, None)
if data_list is not None:
devices, devices_dict = parse_object_type_from_json(
template_elm=template_elm,
data_list=data_list,
elements_dict_by_type=elements_dict_by_type,
time_profile=circuit.time_profile,
block_parser=block_parser,
logger=logger
)
# set/augment the dictionary per type for later
prev_dict = elements_dict_by_type.get(template_elm.device_type, dict())
prev_dict.update(devices_dict) # augment prev_dict with devices_dict
elements_dict_by_type[template_elm.device_type] = prev_dict
# add the devices to the circuit
circuit.set_elements_list_by_type(device_type=template_elm.device_type,
devices=devices,
logger=logger)
else:
# Legacy and optional sections should not generate warnings when absent.
if object_type_key not in {'branch', 'fmu_template'}:
logger.add_warning(msg=f'No data for {object_type_key}')
if progress_func is not None:
progress_func(float(item_count + 1) / float(n_data_types) * 100)
item_count += 1
# Handle the legacy objects that may be present in the data bus not declared in the program
# i.e. generator_technology
handle_legacy_jsons(model_data=model_data,
elements_dict_by_type=elements_dict_by_type,
logger=logger)
# fill in device into pointer devices ------------------------------------------------------------------------------
to_delete: List[ALL_DEV_TYPES] = list()
all_elements_dict: Dict[str, ALL_DEV_TYPES] = dict()
for dtype in [
DeviceType.PiMeasurementDevice,
DeviceType.QiMeasurementDevice,
DeviceType.PfMeasurementDevice,
DeviceType.QfMeasurementDevice,
DeviceType.PtMeasurementDevice,
DeviceType.QtMeasurementDevice,
DeviceType.PgMeasurementDevice,
DeviceType.QgMeasurementDevice,
DeviceType.IfMeasurementDevice,
DeviceType.ItMeasurementDevice,
DeviceType.VaMeasurementDevice,
DeviceType.VmMeasurementDevice,
DeviceType.ContingencyDevice,
DeviceType.InvestmentDevice,
DeviceType.RemedialActionDevice,
DeviceType.RmsEventDevice,
DeviceType.EmtEventDevice,
DeviceType.ShortCircuitEvent
]:
elms = circuit.get_elements_by_type(device_type=dtype)
for elm in elms:
if elm.tpe != DeviceType.NoDevice:
# search in the per-device type dictionary (already created and faster)
referenced_dict = elements_dict_by_type[elm.tpe]
referenced_elm = referenced_dict.get(elm.device_idtag, None)
else:
# lazy-creation of the all elements dict, takes time to initialize
if len(all_elements_dict) == 0:
all_elements_dict, _ = circuit.get_all_elements_dict()
referenced_elm = all_elements_dict.get(elm.device_idtag, None)
if referenced_elm is None:
to_delete.append(elm)
else:
elm.set_device(elm=referenced_elm)
# delete pointer elemnts to missing references
for elm in to_delete:
circuit.delete_element(elm)
logger.add_error(msg="Invalid pointer element deleted",
device_class=elm.device_type.value,
device=elm.name)
# fill in wires into towers ----------------------------------------------------------------------------------------
if text_func is not None:
text_func("Tower wires...")
if 'tower_wires' in data.keys():
df = data['tower_wires']
for i in range(df.shape[0]):
tower_name = df['tower_name'].values[i]
wire_name = df['wire_name'].values[i]
if ((tower_name in elements_dict_by_type[DeviceType.OverheadLineTypeDevice].keys()) and
(wire_name in elements_dict_by_type[DeviceType.WireDevice].keys())):
tower: dev.OverheadLineType = elements_dict_by_type[DeviceType.OverheadLineTypeDevice][tower_name]
wire: dev.Wire = elements_dict_by_type[DeviceType.WireDevice][wire_name]
xpos = df['xpos'].values[i]
ypos = df['ypos'].values[i]
phase = df['phase'].values[i]
tower.add_wire_relationship(wire=wire, xpos=xpos, ypos=ypos, phase=phase)
else:
pass
# create diagrams --------------------------------------------------------------------------------------------------
if text_func is not None:
text_func("Parsing diagrams...")
# try to get the list of diagrams
list_of_diagrams: List[Dict[str, Any]] = data.get('diagrams', None)
if list_of_diagrams is not None:
if len(list_of_diagrams):
obj_dict = circuit.get_all_elements_dict_by_type(add_locations=True)
for diagram_dict in list_of_diagrams:
if diagram_dict['type'] in [DiagramType.Schematic.value, "bus-branch"]:
diagram = dev.SchematicDiagram()
diagram.parse_data(data=diagram_dict, obj_dict=obj_dict, logger=logger)
circuit.add_diagram(diagram)
elif diagram_dict['type'] == DiagramType.SubstationLineMap.value:
diagram = dev.MapDiagram()
diagram.parse_data(data=diagram_dict, obj_dict=obj_dict, logger=logger)
circuit.add_diagram(diagram)
else:
print('unrecognized diagram', diagram_dict['type'])
if text_func is not None:
text_func("Done!")
# search contingencies, investments and remedial actions pointed devices
# and remove those that point nowhere
if refine_pointers:
circuit.refine_pointer_objects(logger=logger)
if circuit.has_time_series:
circuit.ensure_profiles_exist()
resolved_project_directory: Path | None
if project_directory is None:
resolved_project_directory = None
else:
resolved_project_directory = Path(project_directory)
_resolve_circuit_fmu_paths(circuit, resolved_project_directory)
# Enable auto updates internally only after everything has been filled
for elm in circuit.get_all_elements_iter():
elm.enable_auto_updates()
return circuit
[docs]
def parse_multiverse_data(data: dict[str, VERAGRID_FILE_TYPE],
metadata: Dict[str, Dict[str | int | float] | int | None],
text_func: Union[Callable, None] = None,
progress_func: Union[Callable, None] = None,
logger: Logger = Logger()) -> dev.MultiVerse:
"""
:param data:
:param metadata:
:param text_func:
:param progress_func:
:param logger:
:return:
"""
mv = dev.MultiVerse(current_model=None)
diffs_dict: Dict[str, MultiCircuit] = dict()
diagrams_dict: Dict[str, List[Dict[str, Any]]] = dict()
composed_by_node_id: Dict[int, MultiCircuit] = dict()
# IMPORTANT:
# Multiverse node payloads are not all full circuits.
#
# - Root nodes store a full authoritative MultiCircuit.
# - Non-root nodes store only the electrical delta against their parent.
#
# This has two non-obvious consequences during load:
#
# 1. Electrical references in child payloads may legitimately point to objects that are
# not present in the child's delta payload, because those objects live in the parent /
# ancestor composed circuit. Example: a child adds a new line whose bus_to points to a
# bus inherited from the parent scenario. If the child payload were parsed in isolation,
# that reference would fail with "Could not locate reference".
#
# 2. Node-owned diagrams are full scenario diagrams, not diagram deltas. A child diagram may
# therefore reference both child-owned and inherited parent-owned API objects. Parsing such
# diagrams against the raw child delta payload would also fail.
#
# Because of that, multiverse loading must happen in two phases:
# - First parse the electrical payloads in parent-before-child order, passing the fully composed
# parent circuit as previous_circuit so child references to inherited objects can be resolved.
# - Then, after the tree exists, parse each node's diagrams against that node's full composed
# scenario, never against the raw delta payload.
node_metadata = get_multiverse_node_metadata(metadata)
ordered_records = order_multiverse_records(metadata)
all_elements_dict = dict()
for record in ordered_records:
node_id = int(record["node_id"])
circuit_idtag = str(record["circuit_idtag"])
parent_id_raw = record["parent_id"]
parent_id = None if parent_id_raw is None else int(parent_id_raw)
model_data = data["multiverse"][circuit_idtag]
diagrams_dict[circuit_idtag] = model_data.get("diagrams", list())
model_without_diagrams = dict(model_data)
model_without_diagrams["diagrams"] = list()
previous_circuit = None if parent_id is None else composed_by_node_id[parent_id]
grid = parse_veragrid_data(data=model_without_diagrams,
previous_circuit=previous_circuit,
refine_pointers=False,
text_func=text_func,
progress_func=progress_func,
logger=logger)
# we create a dictionary of all the elements in all the scenarios such that finding pointers doesn't fail later
d, ok = grid.get_all_elements_dict(logger=logger)
all_elements_dict.update(d)
grid.idtag = circuit_idtag
diffs_dict[circuit_idtag] = grid
if parent_id is None:
composed_by_node_id[node_id] = grid.copy()
else:
composed = composed_by_node_id[parent_id].copy()
composed.merge_circuit(grid)
composed.name = grid.name
composed_by_node_id[node_id] = composed
# Refine pointer in all grids
for idtag, grid in diffs_dict.items():
grid.refine_pointer_objects(logger=logger, all_elements_dict=all_elements_dict)
mv.parse_json(diffs_dict, metadata)
# Parse diagrams only after the multiverse tree exists, so each node can resolve them
# against its full composed scenario instead of its raw delta payload.
for record in node_metadata.values():
node_id = int(record["node_id"])
circuit_idtag = str(record["circuit_idtag"])
node = mv.get_node(node_id)
full_circuit = mv.checkout(node)
obj_dict = full_circuit.get_all_elements_dict_by_type(add_locations=True)
parsed_diagrams: List[Any] = list()
for diagram_dict in diagrams_dict.get(circuit_idtag, list()):
if diagram_dict['type'] in [DiagramType.Schematic.value, "bus-branch"]:
diagram = dev.SchematicDiagram()
diagram.parse_data(data=diagram_dict, obj_dict=obj_dict, logger=logger)
parsed_diagrams.append(diagram)
elif diagram_dict['type'] == DiagramType.SubstationLineMap.value:
diagram = dev.MapDiagram()
diagram.parse_data(data=diagram_dict, obj_dict=obj_dict, logger=logger)
parsed_diagrams.append(diagram)
node.diagrams = parsed_diagrams
active_node_id_raw = metadata.get("active_node_id", None) if isinstance(metadata, dict) else None
if mv.current_node is not None and mv.current_model is not None:
obj_dict = mv.current_model.get_all_elements_dict_by_type(add_locations=True)
mv.current_model.diagrams = copy_diagrams(diagrams=mv.current_node.diagrams, obj_dict=obj_dict)
if active_node_id_raw is not None and (
mv.current_node is None or mv.current_node.node_id != int(active_node_id_raw)):
mv.activate_scenario(int(active_node_id_raw))
return mv