# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
from typing import List, Dict, Any, Sequence
from warnings import warn
import time
import zipfile
import numpy as np
import pandas as pd
from VeraGridEngine.basic_structures import Logger
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
import VeraGridEngine.Devices as dev
from VeraGridEngine.Devices.types import ALL_DEV_TYPES
from VeraGridEngine.enumerations import DeviceType
from VeraGridEngine.IO.veragrid.pack_unpack import gather_model_as_data_frames, get_objects_dictionary
[docs]
def check_names(names: List[str], logger: Logger) -> None:
"""
Check that the names are allowed
:param names:
:param logger:
:return:
"""
allowed_data_sheets = shorten_dict_keys(get_allowed_sheets(), max_size=30)
for name in names:
if name not in allowed_data_sheets.keys():
# raise Exception('The file sheet ' + name + ' is not allowed.\n'
# 'Did you create this file manually? Use VeraGrid instead.')
logger.add_error("Excel sheet not recognized",
value=name)
[docs]
def get_allowed_sheets() -> Dict[str, Any]:
"""
Get the allowed sheets in the excel file
:return:
"""
########################################################################################################
# declare objects to iterate name: [sample object, list of objects, headers]
########################################################################################################
object_types = get_objects_dictionary()
########################################################################################################
# generic object iteration
########################################################################################################
allowed_data_sheets = {'Conf': None,
'config': None,
'wires': None,
'overhead_line_types': None,
'underground_cable_types': None,
'sequence_line_types': None,
'transformer_types': None,
'time': None,
'load_Sprof': complex,
'load_Iprof': complex,
'load_Zprof': complex,
'static_generator': None,
'static_generator_Sprof': complex,
'static_generator_P_prof': complex,
'static_generator_Q_prof': complex,
'battery': None,
'battery_Vset_profiles': float,
'battery_P_profiles': float,
'controlled_generator': None,
'CtrlGen_Vset_profiles': float,
'CtrlGen_P_profiles': float,
'shunt_Y_profiles': complex,
'generator_technology': float,
'generator_fuel': float,
'generator_emission': float,
'tower_wires': None,
'line_protection_rating_factor_': float}
for object_type_name, object_sample in object_types.items():
for main_property, profile_property in object_sample.properties_with_profile.items():
if profile_property not in allowed_data_sheets.keys():
# create the profile
key = object_type_name + '_' + profile_property
allowed_data_sheets[key] = object_sample.registered_properties[main_property].tpe
# declare the DataFrames for the normal data
allowed_data_sheets[object_type_name] = None
return allowed_data_sheets
[docs]
def shorten_dict_keys(d: Dict[str, Any], max_size=30):
"""
Change dict keys to match the Excel 30 char limit
:param d:
:param max_size:
:return:
"""
d2 = dict()
for key, value in d.items():
key2 = key[:max_size]
d2[key2] = value
return d2
[docs]
def load_from_xls(filename: str, logger: Logger) -> Dict[str, pd.DataFrame]:
"""
Loads the excel file content to a dictionary for parsing the data
"""
data = dict()
deadline: float = time.perf_counter() + 2.0
with pd.ExcelFile(filename) as xl:
names = xl.sheet_names
# check the validity of this excel file
check_names(names=names, logger=logger)
# parse the file
if 'Conf' in names: # version 1
data["version"] = 1.0
for name in names:
if name.lower() == "conf":
df = xl.parse(name)
data["baseMVA"] = np.double(df.values[0, 1])
elif name.lower() == "bus":
df = xl.parse(name)
data["bus"] = np.nan_to_num(df.values)
if len(df) > 0:
if df.index.values.tolist()[0] != 0:
data['bus_names'] = df.index.values.tolist()
elif name.lower() == "gen":
df = xl.parse(name)
data["gen"] = np.nan_to_num(df.values)
if len(df) > 0:
if df.index.values.tolist()[0] != 0:
data['gen_names'] = df.index.values.tolist()
elif name.lower() == "branch":
df = xl.parse(name)
data["branch"] = np.nan_to_num(df.values)
if len(df) > 0:
if df.index.values.tolist()[0] != 0:
data['branch_names'] = df.index.values.tolist()
elif name.lower() == "storage":
df = xl.parse(name)
data["storage"] = np.nan_to_num(df.values)
if len(df) > 0:
if df.index.values.tolist()[0] != 0:
data['storage_names'] = df.index.values.tolist()
elif name.lower() == "lprof":
df = xl.parse(name, index_col=0)
data["Lprof"] = np.nan_to_num(df.values)
data["master_time"] = df.index
elif name.lower() == "lprofq":
df = xl.parse(name, index_col=0)
data["LprofQ"] = np.nan_to_num(df.values)
# ppc["master_time"] = df.index.values
elif name.lower() == "gprof":
df = xl.parse(name, index_col=0)
data["Gprof"] = np.nan_to_num(df.values)
data["master_time"] = df.index # it is the same
elif 'config' in names: # version 2 / 3
allowed_data_sheets = shorten_dict_keys(get_allowed_sheets(), max_size=30)
for name in names:
if name.lower() == "config":
df = xl.parse('config', index_col=0)
data["baseMVA"] = 100
data["name"] = "Grid"
data["Comments"] = ""
# Note: Do not include a default value for version
for i, row in df.iterrows():
if row['Property'] == 'BaseMVA':
data["baseMVA"] = np.double(row['Value'])
elif row['Property'] == 'Version':
data["version"] = np.double(row['Value'])
elif row['Property'] == 'Name':
data["name"] = str(row['Value'])
elif row['Property'] == 'Comments':
data["Comments"] = str(row['Value']).replace("nan", "")
else:
# just pick the DataFrame
df = xl.parse(name, index_col=0)
if allowed_data_sheets[name] == complex:
# pandas does not read complex numbers right,
# so when we expect a complex number input, parse directly
for c in df.columns.values:
df[c] = df[c].apply(lambda x: complex(x))
data[name] = df
else:
logger.add_error('This excel file is not in VeraGrid Format')
return data
return data
[docs]
def interprete_excel_v2(data,
logger: Logger = Logger()) -> MultiCircuit:
"""
Interpret the file version 2
:param data: Dictionary with the excel file sheet labels and the corresponding DataFrame
:return: Nothing, just applies the loaded data to this MultiCircuit instance
"""
# print('Interpreting V2 data...')
# clear all the data
circuit = MultiCircuit()
circuit.name = data['name']
# set the base magnitudes
circuit.Sbase = data['baseMVA']
# dictionary of branch types [name] -> type object
branch_types = dict()
# Set comments
circuit.comments = data['Comments'] if 'Comments' in data.keys() else ''
circuit.time_profile = None
# common function
def set_object_attributes(obj_, attr_list, values):
for a, attr in enumerate(attr_list):
# Hack to change the enabled by active...
if attr == 'is_enabled':
attr = 'active'
if attr == 'type_obj':
attr = 'template'
if hasattr(obj_, attr):
prop = obj_.registered_properties.get(attr, None)
if prop is not None:
conv = prop.tpe # get the type converter
if conv is None:
setattr(obj_, attr, values[a])
elif conv is dev.BranchType:
# cbr = BranchTypeConverter(None)
setattr(obj_, attr, dev.BranchType(values[a]))
elif isinstance(conv, DeviceType):
pass
else:
setattr(obj_, attr, conv(values[a]))
else:
# the property wasn't found
pass
else:
if attr in ['Y', 'Z', 'I', 'S', 'seq_resistance', 'seq_admittance', 'Zf']:
if attr == 'Z':
val = complex(values[a])
re = 1 / val.real if val.real != 0.0 else 0
im = 1 / val.imag if val.imag != 0.0 else 0
setattr(obj_, 'G', re)
setattr(obj_, 'B', im)
if attr == 'Zf':
val = complex(values[a])
re = 1 / val.real if val.real != 0.0 else 0
im = 1 / val.imag if val.imag != 0.0 else 0
# setattr(obj_, 'r_fault', re)
# setattr(obj_, 'x_fault', im)
if attr == 'Y':
val = complex(values[a])
re = val.real
im = val.imag
setattr(obj_, 'G', re)
setattr(obj_, 'B', im)
elif attr == 'I':
val = complex(values[a])
setattr(obj_, 'Ir', val.real)
setattr(obj_, 'Ii', val.imag)
elif attr == 'S':
val = complex(values[a])
setattr(obj_, 'P', val.real)
setattr(obj_, 'Q', val.imag)
elif attr == 'seq_resistance':
val = complex(values[a])
setattr(obj_, 'R1', val.real)
setattr(obj_, 'X1', val.imag)
elif attr == 'seq_admittance':
val = complex(values[a])
setattr(obj_, 'Gsh1', val.real)
setattr(obj_, 'Bsh1', val.imag)
else:
warn(str(obj_) + ' has no ' + attr + ' property.')
# Add the buses ################################################################################################
bus_dict = dict()
if 'bus' in data.keys():
lst = data['bus']
hdr = lst.columns.values
vals = lst.values
for i in range(len(lst)):
obj = dev.Bus()
set_object_attributes(obj, hdr, vals[i, :])
bus_dict[obj.name] = obj
circuit.add_bus(obj)
else:
logger.add_warning('No buses in the file!')
# add the loads ################################################################################################
if 'load' in data.keys():
lst = data['load']
bus_from = lst['bus'].values
hdr = lst.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = lst[hdr].values
for i in range(len(lst)):
obj = dev.Load()
set_object_attributes(obj, hdr, vals[i, :])
if 'load_Sprof' in data.keys():
idx = data['load_Sprof'].index
# create all the profiles
obj.create_profiles(index=idx)
# create the power profiles
val = np.array([complex(v) for v in data['load_Sprof'].values[:, i]])
obj.set_profile_array(magnitude='P', arr=val.real)
obj.set_profile_array(magnitude='Q', arr=val.imag)
if circuit.time_profile is None or circuit.get_time_number() < len(idx):
circuit.time_profile = idx
if 'load_Iprof' in data.keys():
val = np.array([complex(v) for v in data['load_Iprof'].values[:, i]])
idx = data['load_Iprof'].index
obj.set_profile_array(magnitude='Ir', arr=val.real)
obj.set_profile_array(magnitude='Ii', arr=val.imag)
if circuit.time_profile is None or circuit.get_time_number() < len(idx):
circuit.time_profile = idx
if 'load_Zprof' in data.keys():
val = np.array([complex(v) for v in data['load_Zprof'].values[:, i]])
idx = data['load_Zprof'].index
obj.set_profile_array(magnitude='G', arr=val.real)
obj.set_profile_array(magnitude='B', arr=val.imag)
if circuit.time_profile is None or circuit.get_time_number() < len(idx):
circuit.time_profile = idx
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
raise Exception(str(i) + ': Load bus is not in the buses list.\n' + str(ex))
if obj.name == 'Load':
obj.name += f"{circuit.get_loads_number()} @{bus.name}"
circuit.add_load(bus, api_obj=obj)
else:
logger.add_warning('No loads in the file!')
# add the controlled generators ################################################################################
if 'controlled_generator' in data.keys():
lst = data['controlled_generator']
bus_from = lst['bus'].values
hdr = lst.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = lst[hdr].values
for i in range(len(lst)):
obj = dev.Generator()
set_object_attributes(obj, hdr, vals[i, :])
if 'CtrlGen_P_profiles' in data.keys():
val = data['CtrlGen_P_profiles'].values[:, i]
idx = data['CtrlGen_P_profiles'].index
obj.set_profile_array(magnitude='P', arr=val)
# also create the Pf array because there might not be values in the file
# obj.set_profile_array(magnitude='Pf', index=idx)
if 'CtrlGen_Pf_profiles' in data.keys():
val = data['CtrlGen_Pf_profiles'].values[:, i]
idx = data['CtrlGen_Pf_profiles'].index
obj.set_profile_array(magnitude='Pf', arr=val)
if 'CtrlGen_Vset_profiles' in data.keys():
val = data['CtrlGen_Vset_profiles'].values[:, i]
idx = data['CtrlGen_Vset_profiles'].index
obj.set_profile_array(magnitude='Vset', arr=val)
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
raise Exception(str(i) + ': Controlled generator bus is not in the buses list.\n' + str(ex))
if obj.name == 'gen':
obj.name += str(circuit.get_generators_number() + 1) + '@' + bus.name
obj.bus = bus
circuit.add_generator(bus=bus, api_obj=obj)
else:
logger.add_warning('No controlled generator in the file!')
# add the batteries ############################################################################################
if 'battery' in data.keys():
lst = data['battery']
bus_from = lst['bus'].values
hdr = lst.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = lst[hdr].values
for i in range(len(lst)):
obj = dev.Battery()
set_object_attributes(obj, hdr, vals[i, :])
if 'battery_P_profiles' in data.keys():
val = data['battery_P_profiles'].values[:, i]
idx = data['battery_P_profiles'].index
obj.set_profile_array(magnitude='P', arr=val)
# obj.set_profile_array(magnitude='Pf', index=idx)
if 'battery_Pf_profiles' in data.keys():
val = data['battery_Pf_profiles'].values[:, i]
idx = data['battery_Pf_profiles'].index
obj.set_profile_array(magnitude='Pf', arr=val)
if 'battery_Vset_profiles' in data.keys():
val = data['battery_Vset_profiles'].values[:, i]
idx = data['battery_Vset_profiles'].index
obj.set_profile_array(magnitude='Vset', arr=val)
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
raise Exception(str(i) + ': Battery bus is not in the buses list.\n' + str(ex))
if obj.name == 'batt':
obj.name += str(circuit.get_batteries_number() + 1) + '@' + bus.name
circuit.add_battery(bus=bus, api_obj=obj)
else:
logger.add_warning('No battery in the file!')
# add the static generators ####################################################################################
if 'static_generator' in data.keys():
lst = data['static_generator']
bus_from = lst['bus'].values
hdr = lst.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = lst[hdr].values
for i in range(len(lst)):
obj = dev.StaticGenerator()
set_object_attributes(obj, hdr, vals[i, :])
if 'static_generator_Sprof' in data.keys():
val = data['static_generator_Sprof'].values[:, i]
idx = data['static_generator_Sprof'].index
obj.set_profile_array(magnitude='P', arr=val.real)
obj.set_profile_array(magnitude='Q', arr=val.imag)
if 'static_generator_P_prof' in data.keys():
val = data['static_generator_P_prof'].values[:, i]
idx = data['static_generator_P_prof'].index
obj.set_profile_array(magnitude='P', arr=val)
if 'static_generator_Q_prof' in data.keys():
val = data['static_generator_Q_prof'].values[:, i]
idx = data['static_generator_Q_prof'].index
obj.set_profile_array(magnitude='Q', arr=val)
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
raise Exception(str(i) + ': Static generator bus is not in the buses list.\n' + str(ex))
if obj.name == 'StaticGen':
obj.name += str(circuit.get_static_generators_number() + 1) + '@' + bus.name
obj.bus = bus
circuit.add_static_generator(bus=bus, api_obj=obj)
else:
logger.add_warning('No static generator in the file!')
# add the shunts ###############################################################################################
if 'shunt' in data.keys():
lst = data['shunt']
bus_from = lst['bus'].values
hdr = lst.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = lst[hdr].values
for i in range(len(lst)):
obj = dev.Shunt()
set_object_attributes(obj, hdr, vals[i, :])
if 'shunt_Y_profiles' in data.keys():
val = data['shunt_Y_profiles'].values[:, i]
idx = data['shunt_Y_profiles'].index
obj.set_profile_array(magnitude='G', arr=val.real)
obj.set_profile_array(magnitude='B', arr=val.imag)
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
raise Exception(str(i) + ': Shunt bus is not in the buses list.\n' + str(ex))
if obj.name == 'shunt':
obj.name += str(circuit.get_shunts_number() + 1) + '@' + bus.name
obj.bus = bus
circuit.add_shunt(bus=bus, api_obj=obj)
else:
logger.add_warning('No shunt in the file!')
# Add the wires ################################################################################################
if 'wires' in data.keys():
lst = data['wires']
hdr = lst.columns.values
vals = lst.values
for i in range(len(lst)):
obj = dev.Wire()
set_object_attributes(obj, hdr, vals[i, :])
circuit.add_wire(obj)
else:
logger.add_warning('No wires in the file!')
# Add the overhead_line_types ##################################################################################
if 'overhead_line_types' in data.keys():
df = data['overhead_line_types']
if data['overhead_line_types'].values.shape[0] > 0:
for tower_name in df['tower_name'].unique():
obj = dev.OverheadLineType()
dft = df[df['tower_name'] == tower_name]
vals = dft.values
wire_prop = df.columns.values[len(obj.registered_properties):]
# set the tower values
set_object_attributes(obj, obj.registered_properties.keys(), vals[0, :])
# add the wires
if len(wire_prop) == 7:
for i in range(vals.shape[0]):
# ['wire_name' 'xpos' 'ypos' 'phase' 'r' 'x' 'gmr']
name = dft['wire_name'].values[i]
gmr = dft['gmr'].values[i]
r = dft['r'].values[i]
x = dft['x'].values[i]
xpos = dft['xpos'].values[i]
ypos = dft['ypos'].values[i]
phase = dft['phase'].values[i]
wire = dev.Wire(name=name, r_ext=gmr, r=r, x=x)
obj.add_wire_relationship(wire=wire, xpos=xpos, ypos=ypos, phase=phase)
circuit.add_overhead_line(obj)
branch_types[str(obj)] = obj
else:
pass
else:
logger.add_warning('No overhead_line_types in the file!')
# Add the wires ################################################################################################
if 'underground_cable_types' in data.keys():
lst = data['underground_cable_types']
hdr = lst.columns.values
vals = lst.values
# for i in range(len(lst)):
# obj = UndergroundLineType()
# set_object_attributes(obj, hdr, vals[i, :])
# circuit.underground_cable_types.append(obj)
# branch_types[str(obj)] = obj
else:
logger.add_warning('No underground_cable_types in the file!')
# Add the sequence line types ##################################################################################
if 'sequence_line_types' in data.keys():
lst = data['sequence_line_types']
hdr = lst.columns.values
vals = lst.values
for i in range(len(lst)):
obj = dev.SequenceLineType()
set_object_attributes(obj, hdr, vals[i, :])
circuit.add_sequence_line(obj)
branch_types[str(obj)] = obj
else:
logger.add_warning('No sequence_line_types in the file!')
# Add the transformer types ####################################################################################
if 'transformer_types' in data.keys():
lst = data['transformer_types']
hdr = lst.columns.values
vals = lst.values
for i in range(len(lst)):
obj = dev.TransformerType()
set_object_attributes(obj, hdr, vals[i, :])
circuit.add_transformer_type(obj)
branch_types[str(obj)] = obj
else:
logger.add_warning('No transformer_types in the file!')
# Add the Branches #############################################################################################
if 'branch' in data.keys():
lst = data['branch']
# fix the old 'is_transformer' property
if 'is_transformer' in lst.columns.values:
lst['is_transformer'] = lst['is_transformer'].map({True: 'transformer', False: 'line'})
lst.rename(columns={'is_transformer': 'branch_type'}, inplace=True)
bus_from = lst['bus_from'].values
bus_to = lst['bus_to'].values
hdr = lst.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus_from'))
hdr = np.delete(hdr, np.argwhere(hdr == 'bus_to'))
vals = lst[hdr].values
for i in range(len(lst)):
try:
obj = dev.Branch(bus_from=bus_dict[str(bus_from[i])], bus_to=bus_dict[str(bus_to[i])])
except KeyError as ex:
raise Exception(str(i) + ': Branch bus is not in the buses list.\n' + str(ex))
set_object_attributes(obj, hdr, vals[i, :])
# correct the branch template object
template_name = str(obj.template)
if template_name in branch_types.keys():
obj.template = branch_types[template_name]
print(template_name, 'updated!')
# set the branch
circuit.add_branch(obj)
else:
logger.add_warning('No Branches in the file!')
# Other actions ################################################################################################
logger += circuit.apply_all_branch_types()
return circuit
[docs]
def interpret_excel_v3(data: Dict[str, pd.DataFrame],
logger: Logger = Logger()) -> MultiCircuit:
"""
Interpret the file version 3
In this file version there are no complex numbers saved
:param circuit:
:param data: Dictionary with the excel file sheet labels and the corresponding DataFrame
:param logger: Loger object
:return: Nothing, just applies the loaded data to this MultiCircuit instance
"""
# print('Interpreting V2 data...')
circuit = MultiCircuit()
circuit.name = data['name']
# set the base magnitudes
circuit.Sbase = data['baseMVA']
# dictionary of branch types [name] -> type object
branch_types = dict()
# Set comments
circuit.comments = data['Comments'] if 'Comments' in data.keys() else ''
# common function
def set_object_attributes(obj_: ALL_DEV_TYPES,
attr_list: List[str],
values: Sequence[float | int | str]):
"""
:param obj_:
:param attr_list:
:param values:
:return:
"""
for a, attr in enumerate(attr_list):
# Hack to change the enabled by active...
if attr == 'is_enabled':
attr = 'active'
if attr == 'type_obj':
attr = 'template'
if attr == 'wire_name':
attr = 'name'
if hasattr(obj_, attr):
prop = obj_.registered_properties.get(attr, None)
if prop is not None:
conv = prop.tpe # get the type converter
if conv is None:
setattr(obj_, attr, values[a])
elif conv is dev.BranchType:
# cbr = BranchTypeConverter(None)
setattr(obj_, attr, dev.BranchType(values[a]))
elif conv in [DeviceType.AreaDevice,
DeviceType.SubstationDevice,
DeviceType.ZoneDevice,
DeviceType.CountryDevice]:
pass
else:
setattr(obj_, attr, conv(values[a]))
else:
logger.add_warning("No registered property",
device=str(obj),
value=attr)
else:
logger.add_warning("No registered property",
device=str(obj),
value=attr)
# time profile #################################################################################################
if 'time' in data.keys():
time_df = data['time']
circuit.time_profile = pd.to_datetime(time_df.values[:, 0])
else:
circuit.time_profile = None
# Add the buses ################################################################################################
bus_dict = dict()
if 'bus' in data.keys():
df = data['bus']
hdr = df.columns.values
vals = df.values
for i in range(len(df)):
obj = dev.Bus()
set_object_attributes(obj, hdr, vals[i, :])
bus_dict[obj.name] = obj
circuit.add_bus(obj)
else:
logger.add_error("No buses in this file")
# add the loads ################################################################################################
if 'load' in data.keys():
df = data['load']
bus_from = df['bus'].values
hdr = df.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = df[hdr].values
profles_attr = {'load_P_prof': 'P_prof',
'load_Q_prof': 'Q_prof',
'load_Ir_prof': 'Ir_prof',
'load_Ii_prof': 'Ii_prof',
'load_G_prof': 'G_prof',
'load_B_prof': 'B_prof',
'load_active_prof': 'active_prof',
'load_Cost_prof': 'Cost_prof'}
for i in range(df.shape[0]):
obj = dev.Load()
set_object_attributes(obj, hdr, vals[i, :])
# parse profiles:
for sheet_name, load_attr in profles_attr.items():
if sheet_name in data.keys():
val = data[sheet_name].values[:, i]
idx = data[sheet_name].index
# setattr(obj, load_attr, pd.DataFrame(data=val, index=idx))
obj.set_snapshot_value(load_attr, val)
if circuit.time_profile is None or len(circuit.time_profile) < len(idx):
circuit.time_profile = idx
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
logger.add_error("Load bus is not in the buses list", device=str(i))
if obj.name == 'Load':
obj.name += f"{circuit.get_loads_number()} @{bus.name}"
obj.bus = bus
circuit.add_load(bus=bus, api_obj=obj)
else:
logger.add_warning('No loads in the file!')
# add the controlled generators ################################################################################
if 'generator' in data.keys():
df = data['generator']
bus_from = df['bus'].values
hdr = df.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = df[hdr].values
for i in range(df.shape[0]):
obj = dev.Generator()
set_object_attributes(obj, hdr, vals[i, :])
if 'generator_P_prof' in data.keys():
val = data['generator_P_prof'].values[:, i]
idx = data['generator_P_prof'].index
obj.set_profile_array(magnitude='P', arr=val)
# also create the Pf array because there might not be values in the file
# obj.set_profile_array(magnitude='Pf', arr=None)
if circuit.time_profile is None or len(circuit.time_profile) < len(idx):
circuit.time_profile = idx
if 'generator_Pf_prof' in data.keys():
val = data['generator_Pf_prof'].values[:, i]
idx = data['generator_Pf_prof'].index
obj.set_profile_array(magnitude='Pf', arr=val)
if 'generator_Vset_prof' in data.keys():
val = data['generator_Vset_prof'].values[:, i]
idx = data['generator_Vset_prof'].index
obj.set_profile_array(magnitude='Vset', arr=val)
if 'generator_active_prof' in data.keys():
val = data['generator_active_prof'].values[:, i]
idx = data['generator_active_prof'].index
obj.set_profile_array(magnitude='active', arr=val)
if 'generator_Cost_prof' in data.keys():
val = data['generator_Cost_prof'].values[:, i]
idx = data['generator_Cost_prof'].index
obj.set_profile_array(magnitude='Cost', arr=val)
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
logger.add_error("Controlled generator bus is not in the buses list", device=str(i))
if obj.name == 'gen':
obj.name += str(circuit.get_generators_number() + 1) + '@' + bus.name
obj.bus = bus
circuit.add_generator(bus=bus, api_obj=obj)
else:
logger.add_warning('No controlled generator in the file!')
# add the batteries ############################################################################################
if 'battery' in data.keys():
df = data['battery']
bus_from = df['bus'].values
hdr = df.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = df[hdr].values
for i in range(df.shape[0]):
obj = dev.Battery()
set_object_attributes(obj, hdr, vals[i, :])
if 'battery_P_prof' in data.keys():
val = data['battery_P_prof'].values[:, i]
idx = data['battery_P_prof'].index
obj.set_profile_array(magnitude='P', arr=val)
# also create the Pf array because there might not be values in the file
# obj.set_profile_array(magnitude='Pf', index=idx, arr=None)
if 'battery_Vset_prof' in data.keys():
val = data['battery_Vset_prof'].values[:, i]
idx = data['battery_Vset_prof'].index
obj.set_profile_array(magnitude='Vset', arr=val)
if 'battery_active_prof' in data.keys():
val = data['battery_active_prof'].values[:, i]
idx = data['battery_active_prof'].index
obj.set_profile_array(magnitude='active', arr=val)
if 'battery_Cost_prof' in data.keys():
val = data['battery_Cost_prof'].values[:, i]
idx = data['battery_Cost_prof'].index
obj.set_profile_array(magnitude='Cost', arr=val)
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
logger.add_error("Battery bus is not in the buses list", device=str(i))
if obj.name == 'batt':
obj.name += str(circuit.get_batteries_number() + 1) + '@' + bus.name
obj.bus = bus
circuit.add_battery(bus=bus, api_obj=obj)
else:
logger.add_warning('No battery in the file!')
# add the static generators ####################################################################################
if 'static_generator' in data.keys():
df = data['static_generator']
bus_from = df['bus'].values
hdr = df.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = df[hdr].values
for i in range(df.shape[0]):
obj = dev.StaticGenerator()
set_object_attributes(obj, hdr, vals[i, :])
if 'static_generator_Sprof' in data.keys():
val = data['static_generator_Sprof'].values[:, i]
idx = data['static_generator_Sprof'].index
obj.set_profile_array(magnitude='P', arr=val.real)
obj.set_profile_array(magnitude='Q', arr=val.imag)
if 'static_generator_P_prof' in data.keys():
val = data['static_generator_P_prof'].values[:, i]
idx = data['static_generator_P_prof'].index
obj.set_profile_array(magnitude='P', arr=val)
if 'static_generator_Q_prof' in data.keys():
val = data['static_generator_Q_prof'].values[:, i]
idx = data['static_generator_Q_prof'].index
obj.set_profile_array(magnitude='Q', arr=val)
if 'static_generator_active_prof' in data.keys():
val = data['static_generator_active_prof'].values[:, i]
idx = data['static_generator_active_prof'].index
obj.set_profile_array(magnitude='active', arr=val)
if 'static_generator_Cost_prof' in data.keys():
val = data['static_generator_Cost_prof'].values[:, i]
idx = data['static_generator_Cost_prof'].index
obj.set_profile_array(magnitude='Cost', arr=val)
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
logger.add_error("Static generator bus is not in the buses list", device=str(i))
if obj.name == 'StaticGen':
obj.name += str(circuit.get_static_generators_number() + 1) + '@' + bus.name
obj.bus = bus
circuit.add_static_generator(bus=bus, api_obj=obj)
else:
logger.add_warning('No static generator in the file!')
# add the shunts ###############################################################################################
if 'shunt' in data.keys():
df = data['shunt']
bus_from = df['bus'].values
hdr = df.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus'))
vals = df[hdr].values
for i in range(df.shape[0]):
obj = dev.Shunt()
set_object_attributes(obj, hdr, vals[i, :])
if 'shunt_Y_profiles' in data.keys():
val = data['shunt_Y_profiles'].values[:, i]
idx = data['shunt_Y_profiles'].index
obj.set_profile_array(magnitude='G', arr=val.real)
obj.set_profile_array(magnitude='B', arr=val.imag)
if 'shunt_G_prof' in data.keys():
val = data['shunt_G_prof'].values[:, i]
idx = data['shunt_G_prof'].index
obj.set_profile_array(magnitude='G', arr=val)
if 'shunt_B_prof' in data.keys():
val = data['shunt_B_prof'].values[:, i]
idx = data['shunt_B_prof'].index
obj.set_profile_array(magnitude='B', arr=val)
if 'shunt_active_prof' in data.keys():
val = data['shunt_active_prof'].values[:, i]
idx = data['shunt_active_prof'].index
obj.set_profile_array(magnitude='active', arr=val)
if 'shunt_Cost_prof' in data.keys():
val = data['shunt_Cost_prof'].values[:, i]
idx = data['shunt_Cost_prof'].index
obj.set_profile_array(magnitude='Cost', arr=val)
try:
bus = bus_dict[str(bus_from[i])]
except KeyError as ex:
logger.add_error("Shunt bus is not in the buses list", device=str(i))
if obj.name == 'shunt':
obj.name += str(circuit.get_shunts_number() + 1) + '@' + bus.name
obj.bus = bus
circuit.add_shunt(bus=bus, api_obj=obj)
else:
logger.add_warning('No shunt in the file!')
# Add the wires ################################################################################################
if 'wires' in data.keys():
df = data['wires']
hdr = df.columns.values
vals = df.values
for i in range(len(df)):
obj = dev.Wire()
set_object_attributes(obj, hdr, vals[i, :])
circuit.add_wire(obj)
else:
logger.add_warning('No wires in the file!')
# Add the overhead_line_types ##################################################################################
if 'overhead_line_types' in data.keys():
df = data['overhead_line_types']
if data['overhead_line_types'].values.shape[0] > 0:
for tower_name in df['tower_name'].unique():
obj = dev.OverheadLineType()
dft = df[df['tower_name'] == tower_name]
vals = dft.values
wire_prop = df.columns.values[len(obj.registered_properties):]
# set the tower values
set_object_attributes(obj, obj.registered_properties.keys(), vals[0, :])
# add the wires
if len(wire_prop) == 7:
for i in range(vals.shape[0]):
# ['wire_name' 'xpos' 'ypos' 'phase' 'r' 'x' 'gmr']
name = dft['wire_name'].values[i]
gmr = dft['gmr'].values[i]
r = dft['r'].values[i]
x = dft['x'].values[i]
xpos = dft['xpos'].values[i]
ypos = dft['ypos'].values[i]
phase = dft['phase'].values[i]
wire = dev.Wire(name=name, r_ext=gmr, r=r, x=x)
obj.add_wire_relationship(wire=wire, xpos=xpos, ypos=ypos, phase=phase)
circuit.add_overhead_line(obj)
branch_types[str(obj)] = obj
else:
pass
else:
logger.add_warning('No overhead_line_types in the file!')
# Add the wires ################################################################################################
if 'underground_cable_types' in data.keys():
df = data['underground_cable_types']
hdr = df.columns.values
vals = df.values
# for i in range(len(lst)):
# obj = UndergroundLineType()
# set_object_attributes(obj, hdr, vals[i, :])
# circuit.underground_cable_types.append(obj)
# branch_types[str(obj)] = obj
else:
logger.add_warning('No underground_cable_types in the file!')
# Add the sequence line types ##################################################################################
if 'sequence_line_types' in data.keys():
df = data['sequence_line_types']
hdr = df.columns.values
vals = df.values
for i in range(len(df)):
obj = dev.SequenceLineType()
set_object_attributes(obj, hdr, vals[i, :])
circuit.add_sequence_line(obj)
branch_types[str(obj)] = obj
else:
logger.add_warning('No sequence_line_types in the file!')
# Add the transformer types ####################################################################################
if 'transformer_types' in data.keys():
df = data['transformer_types']
hdr = df.columns.values
vals = df.values
for i in range(len(df)):
obj = dev.TransformerType()
set_object_attributes(obj, hdr, vals[i, :])
circuit.add_transformer_type(obj)
branch_types[str(obj)] = obj
else:
logger.add_warning('No transformer_types in the file!')
# Add the Branches #############################################################################################
if 'branch' in data.keys():
df = data['branch']
# fix the old 'is_transformer' property
if 'is_transformer' in df.columns.values:
df['is_transformer'] = df['is_transformer'].map({True: 'transformer', False: 'line'})
df.rename(columns={'is_transformer': 'branch_type'}, inplace=True)
bus_from = df['bus_from'].values
bus_to = df['bus_to'].values
hdr = df.columns.values
hdr = np.delete(hdr, np.argwhere(hdr == 'bus_from'))
hdr = np.delete(hdr, np.argwhere(hdr == 'bus_to'))
vals = df[hdr].values
for i in range(df.shape[0]):
try:
obj = dev.Branch(bus_from=bus_dict[str(bus_from[i])], bus_to=bus_dict[str(bus_to[i])])
except KeyError as ex:
logger.add_error("Static generator bus is not in the buses list", device=str(i))
set_object_attributes(obj, hdr, vals[i, :])
# set the branch
circuit.add_branch(obj)
if 'branch_active_prof' in data.keys():
val = data['branch_active_prof'].values[:, i]
idx = data['branch_active_prof'].index
obj.set_profile_array(magnitude='active', arr=val)
if 'branch_Cost_prof' in data.keys():
val = data['branch_Cost_prof'].values[:, i]
idx = data['branch_Cost_prof'].index
obj.set_profile_array(magnitude='Cost', arr=val)
if 'branch_temp_oper_prof' in data.keys():
val = data['branch_temp_oper_prof'].values[:, i]
idx = data['branch_temp_oper_prof'].index
obj.set_profile_array(magnitude='temp_oper', arr=val)
# correct the branch template object
template_name = str(obj.template)
if template_name in branch_types.keys():
obj.template = branch_types[template_name]
circuit.logger.add_info('Updated template', template_name)
else:
logger.add_warning('No Branches in the file!')
# Other actions ################################################################################################
logger += circuit.apply_all_branch_types()
return circuit
[docs]
def save_excel_v4(circuit: MultiCircuit, file_path):
"""
Save the circuit information in excel format
:param circuit: MultiCircuit instance
:param file_path: path to the excel file
:return: logger with information
"""
logger = Logger()
dfs = gather_model_as_data_frames(circuit=circuit, logger=logger, legacy=True)
# flush-save ###################################################################################################
with pd.ExcelWriter(file_path) as writer:
for key in dfs.keys():
key2 = str(key)
if len(key2) > 30:
key2 = key2[:30] # excel sheet names have a max of 30 chars
dfs[key].to_excel(excel_writer=writer, sheet_name=key2)
deadline: float = time.perf_counter() + 2.0
while True:
try:
with zipfile.ZipFile(file_path, 'r') as archive:
archive.namelist()
break
except (PermissionError, zipfile.BadZipFile):
if time.perf_counter() >= deadline:
break
time.sleep(0.05)
return logger