Source code for VeraGridEngine.IO.ucte.veragrid_to_ucte
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
import math
from datetime import datetime
from typing import Dict, List, TextIO
import VeraGridEngine.Devices as dev
from VeraGridEngine.Devices.Injections.external_grid import ExternalGrid
from VeraGridEngine.Devices.Injections.static_generator import StaticGenerator
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.basic_structures import Logger
from VeraGridEngine.enumerations import ExternalGridMode, TapModuleControl, TapPhaseControl, GeneratorControlMode
[docs]
class UcteBusAggregate:
"""
Store the bus-level values required by one exported UCTE node.
"""
__slots__ = (
"status",
"node_type",
"voltage_reference_kv",
"active_load_mw",
"reactive_load_mvar",
"has_generation_record",
"active_gen_mw",
"reactive_gen_mvar",
"has_generation_limits",
"min_gen_mw",
"max_gen_mw",
"min_gen_mvar",
"max_gen_mvar",
"plant_type",
"shunt_b_mvar",
)
def __init__(self) -> None:
"""
Build one empty node aggregate.
:return: None.
"""
self.status: int = 0
self.node_type: int = 0
self.voltage_reference_kv: float = math.nan
self.active_load_mw: float = 0.0
self.reactive_load_mvar: float = 0.0
self.has_generation_record: bool = False
self.active_gen_mw: float = 0.0
self.reactive_gen_mvar: float = 0.0
self.has_generation_limits: bool = False
self.min_gen_mw: float = 0.0
self.max_gen_mw: float = 0.0
self.min_gen_mvar: float = 0.0
self.max_gen_mvar: float = 0.0
self.plant_type: str = ""
self.shunt_b_mvar: float = 0.0
[docs]
class UcteBranchCounter:
"""
Assign one-character UCTE order codes per branch endpoint pair.
"""
__slots__ = ("_count_by_pair",)
def __init__(self) -> None:
"""
Build one empty counter.
:return: None.
"""
self._count_by_pair: Dict[tuple[str, str], int] = dict()
[docs]
def get_next_code(self, node_1: str, node_2: str, logger: Logger) -> str:
"""
Get the next order code for one endpoint pair.
:param node_1: First node code.
:param node_2: Second node code.
:param logger: Export logger.
:return: One-character UCTE order code.
"""
alphabet: str = "123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0"
pair_key: tuple[str, str] = tuple(sorted((node_1, node_2)))
current_count: int = self._count_by_pair.get(pair_key, 0)
self._count_by_pair[pair_key] = current_count + 1
if current_count < len(alphabet):
return alphabet[current_count]
else:
logger.add_warning("Too many parallel UCTE branches for one endpoint pair, reusing the last order code",
device=f"{node_1}-{node_2}",
value=current_count + 1,
expected_value=f"<= {len(alphabet)}")
return alphabet[len(alphabet) - 1]
[docs]
def get_ucte_voltage_code(voltage_kv: float) -> str:
"""
Map one nominal voltage value to the closest standard UCTE voltage code.
:param voltage_kv: Nominal voltage in kV.
:return: One-character voltage code.
"""
voltage_map: Dict[str, float] = {
"0": 750.0,
"1": 380.0,
"2": 220.0,
"3": 150.0,
"4": 120.0,
"5": 110.0,
"6": 70.0,
"7": 27.0,
"8": 330.0,
"9": 500.0,
"A": 26.0,
"B": 25.0,
"C": 24.0,
"D": 23.0,
"E": 22.0,
"F": 21.0,
"G": 20.0,
"H": 19.0,
"I": 18.0,
"J": 17.0,
"K": 15.7,
"L": 15.0,
"M": 13.7,
"N": 13.0,
"O": 12.0,
"P": 11.0,
"Q": 9.8,
"R": 9.0,
"S": 8.0,
"T": 7.0,
"U": 6.0,
"V": 5.0,
"W": 4.0,
"X": 3.0,
"Y": 2.0,
"Z": 1.0,
}
best_code: str = "Z"
best_distance: float = float("inf")
code_value: float
code_key: str
for code_key, code_value in voltage_map.items():
distance: float = abs(code_value - float(voltage_kv))
if distance < best_distance:
best_distance = distance
best_code = code_key
else:
pass
return best_code
[docs]
def is_usable_ucte_bus_code(node_code: str, bus: dev.Bus) -> bool:
"""
Check if one bus code can be reused directly as a canonical UCTE node code.
:param node_code: Candidate node code.
:param bus: VeraGrid bus.
:return: ``True`` when the code is already UCTE-safe.
"""
if len(node_code) != 8:
return False
else:
pass
if any(character.isspace() for character in node_code):
return False
else:
pass
if node_code.endswith("Q"):
return False
else:
pass
voltage_code: str = get_ucte_voltage_code(bus.Vnom)
if node_code[6] != voltage_code:
return False
else:
pass
return True
[docs]
def normalize_reusable_noncanonical_ucte_bus_code(node_code: str) -> str | None:
"""
Normalize one existing non-canonical node code for roundtrip preservation.
This keeps non-canonical UCTE identifiers from imported synthetic datasets
so that the importer can continue applying its nominal-voltage repair
heuristics on roundtrip.
:param node_code: Candidate node code.
:return: Normalized 8-character code or ``None``.
"""
normalized_code: str = str(node_code)
if len(normalized_code) < 8 and any(character.isspace() for character in normalized_code):
normalized_code = normalized_code.ljust(8)
else:
pass
if len(normalized_code) != 8:
return None
else:
pass
if normalized_code.strip() == "":
return None
else:
pass
if normalized_code.endswith("Q"):
return None
else:
pass
if any(character.isspace() for character in normalized_code):
return normalized_code
else:
return None
[docs]
def get_country_code(bus: dev.Bus) -> str:
"""
Get the UCTE country code to use for one bus block.
:param bus: VeraGrid bus.
:return: Two-letter country code or ``XX``.
"""
if bus.country is None:
return "XX"
else:
pass
country_name: str = str(bus.country.name).strip().upper()
if len(country_name) == 2 and country_name.isalpha():
return country_name
else:
return "XX"
[docs]
def is_standard_ucte_country_code(country_code: str) -> bool:
"""
Check if one country code matches the standard UCTE two-letter form.
:param country_code: Country code to inspect.
:return: ``True`` when the code is standard.
"""
normalized_code: str = str(country_code).strip()
return (
len(normalized_code) == 2
and normalized_code.isalpha()
and normalized_code.upper() == normalized_code
and normalized_code != "XX"
)
[docs]
def use_legacy_transformer_orientation(bus_from: dev.Bus, bus_to: dev.Bus) -> bool:
"""
Select the transformer orientation expected by the UCTE importer.
Standard country-coded UCTE data uses the legacy regulated/non-regulated
winding convention, while IEEE-like synthetic files use the direct
``node1 -> node2`` orientation.
:param bus_from: VeraGrid transformer ``bus_from`` side.
:param bus_to: VeraGrid transformer ``bus_to`` side.
:return: ``True`` when the legacy UCTE orientation must be exported.
"""
country_code_from: str = get_country_code(bus_from)
country_code_to: str = get_country_code(bus_to)
return is_standard_ucte_country_code(country_code_from) and is_standard_ucte_country_code(country_code_to)
[docs]
def build_bus_node_code_map(circuit: MultiCircuit, logger: Logger) -> Dict[dev.Bus, str]:
"""
Build one deterministic UCTE node-code map for every exported bus.
:param circuit: Circuit to export.
:param logger: Export logger.
:return: Bus-to-node-code mapping.
"""
code_map: Dict[dev.Bus, str] = dict()
used_codes: set[str] = set()
bus_index: int = 0
bus: dev.Bus
for bus in circuit.get_buses():
candidate_code: str = str(bus.code)
candidate_code_display: str = candidate_code.strip()
reusable_noncanonical_code: str | None = normalize_reusable_noncanonical_ucte_bus_code(candidate_code)
if is_usable_ucte_bus_code(candidate_code, bus) and candidate_code not in used_codes:
code_map[bus] = candidate_code
used_codes.add(candidate_code)
elif reusable_noncanonical_code is not None and reusable_noncanonical_code not in used_codes:
code_map[bus] = reusable_noncanonical_code
used_codes.add(reusable_noncanonical_code)
else:
voltage_code: str = get_ucte_voltage_code(bus.Vnom)
prefix: str
if bus.internal:
prefix = "X"
else:
prefix = "N"
generated_code: str = ""
found_code: bool = False
while not found_code:
generated_code = f"{prefix}{bus_index:05d}{voltage_code}A"
bus_index += 1
if generated_code not in used_codes:
found_code = True
else:
found_code = False
code_map[bus] = generated_code
used_codes.add(generated_code)
if candidate_code_display != "":
logger.add_warning("Generated canonical UCTE node code for bus",
device=bus.name,
value=candidate_code_display,
expected_value=generated_code)
else:
logger.add_info("Generated canonical UCTE node code for bus",
device=bus.name,
value=generated_code)
return code_map
[docs]
def build_shunt_node_code(bus_code: str) -> str:
"""
Build the fictitious-shunt node code paired to one exported bus code.
:param bus_code: Main bus node code.
:return: Fictitious-shunt node code.
"""
return f"{bus_code[:7]}Q"
[docs]
def get_snapshot_datetime(circuit: MultiCircuit, t_idx: int | None) -> datetime:
"""
Get the export timestamp represented by one UCTE file.
:param circuit: Circuit to export.
:param t_idx: Optional profile index.
:return: Snapshot datetime.
"""
if t_idx is None:
return circuit.snapshot_time
else:
if circuit.has_time_series:
return circuit.time_profile[t_idx].to_pydatetime()
else:
return circuit.snapshot_time
[docs]
def get_bus_voltage_reference(bus: dev.Bus) -> float:
"""
Convert the VeraGrid bus voltage seed to the UCTE ``Uref`` value.
:param bus: VeraGrid bus.
:return: Voltage reference in kV.
"""
if bus.Vnom > 0.0:
return float(bus.Vm0) * float(bus.Vnom)
else:
return math.nan
[docs]
def update_generation_limits_from_generator(aggregate: UcteBusAggregate,
generator: dev.Generator | dev.Battery) -> None:
"""
Accumulate UCTE generation limits from one VeraGrid generator-like device.
:param aggregate: Target bus aggregate.
:param generator: Generator or battery device.
:return: None.
"""
min_gen_mw: float = -float(generator.Pmax)
max_gen_mw: float = -float(generator.Pmin)
min_gen_mvar: float = -float(generator.Qmax)
max_gen_mvar: float = -float(generator.Qmin)
if aggregate.has_generation_limits:
aggregate.min_gen_mw += min_gen_mw
aggregate.max_gen_mw += max_gen_mw
aggregate.min_gen_mvar += min_gen_mvar
aggregate.max_gen_mvar += max_gen_mvar
else:
aggregate.min_gen_mw = min_gen_mw
aggregate.max_gen_mw = max_gen_mw
aggregate.min_gen_mvar = min_gen_mvar
aggregate.max_gen_mvar = max_gen_mvar
aggregate.has_generation_limits = True
[docs]
def add_generator_like_device(aggregate: UcteBusAggregate,
device: dev.Generator | dev.Battery | StaticGenerator,
t_idx: int | None) -> None:
"""
Add one generator-like device to the bus aggregate using UCTE sign semantics.
:param aggregate: Target bus aggregate.
:param device: Generator-like device.
:param t_idx: Optional profile index.
:return: None.
"""
active_power_mw: float = float(device.get_P_at(t_idx))
reactive_power_mvar: float = float(device.get_Q_at(t_idx))
# UCTE generation is imported with the opposite sign, so the exporter must
# invert VeraGrid generator injections to preserve the same numerical state.
aggregate.active_gen_mw += -active_power_mw
aggregate.reactive_gen_mvar += -reactive_power_mvar
aggregate.has_generation_record = True
[docs]
def add_load_like_device(aggregate: UcteBusAggregate,
device: dev.Load | ExternalGrid,
t_idx: int | None,
logger: Logger) -> None:
"""
Add one load-like device to the bus aggregate.
:param aggregate: Target bus aggregate.
:param device: Load or load-mode external grid.
:param t_idx: Optional profile index.
:param logger: Export logger.
:return: None.
"""
aggregate.active_load_mw += float(device.get_P_at(t_idx))
aggregate.reactive_load_mvar += float(device.get_Q_at(t_idx))
if abs(float(device.get_G_at(t_idx))) > 1e-9 or abs(float(device.get_B_at(t_idx))) > 1e-9:
logger.add_warning("UCTE node export ignores admittance terms on load-like devices",
device=device.name,
value=f"G={device.get_G_at(t_idx)}, B={device.get_B_at(t_idx)}")
else:
pass
[docs]
def update_aggregate_voltage_control_from_generator(aggregate: UcteBusAggregate,
bus: dev.Bus,
generator: dev.Generator | dev.Battery,
t_idx: int | None) -> None:
"""
Apply one generator voltage-control mode to the aggregate node state.
:param aggregate: Target bus aggregate.
:param bus: Parent bus.
:param generator: Generator or battery.
:param t_idx: Optional profile index.
:return: None.
"""
if generator.control_mode == GeneratorControlMode.V:
if aggregate.node_type != 3:
aggregate.node_type = 2
else:
pass
aggregate.voltage_reference_kv = float(generator.get_Vset_at(t_idx)) * float(bus.Vnom)
else:
pass
[docs]
def update_aggregate_from_external_grid(aggregate: UcteBusAggregate,
bus: dev.Bus,
external_grid: ExternalGrid,
t_idx: int | None,
logger: Logger) -> None:
"""
Apply one external-grid device to the UCTE node aggregate.
:param aggregate: Target bus aggregate.
:param bus: Parent bus.
:param external_grid: External-grid device.
:param t_idx: Optional profile index.
:param logger: Export logger.
:return: None.
"""
if external_grid.mode == ExternalGridMode.PQ:
add_load_like_device(aggregate=aggregate,
device=external_grid,
t_idx=t_idx,
logger=logger)
elif external_grid.mode == ExternalGridMode.PV:
if aggregate.node_type != 3:
aggregate.node_type = 2
else:
pass
aggregate.voltage_reference_kv = float(external_grid.get_Vm_at(t_idx)) * float(bus.Vnom)
# Power values remain important in PQ/PV steady-state exchanges. The
# load-parent sign convention is preserved exactly on the UCTE node.
aggregate.active_load_mw += float(external_grid.get_P_at(t_idx))
aggregate.reactive_load_mvar += float(external_grid.get_Q_at(t_idx))
elif external_grid.mode == ExternalGridMode.VD:
aggregate.node_type = 3
aggregate.voltage_reference_kv = float(external_grid.get_Vm_at(t_idx)) * float(bus.Vnom)
aggregate.active_load_mw += float(external_grid.get_P_at(t_idx))
aggregate.reactive_load_mvar += float(external_grid.get_Q_at(t_idx))
else:
logger.add_warning("Unsupported external-grid mode for UCTE export",
device=external_grid.name,
value=str(external_grid.mode))
[docs]
def build_bus_aggregate(circuit: MultiCircuit,
bus: dev.Bus,
bus_injections: Dict[dev.Bus, Dict[object, List[object]]],
t_idx: int | None,
logger: Logger) -> UcteBusAggregate:
"""
Aggregate all supported VeraGrid injections attached to one bus.
:param circuit: Circuit to export.
:param bus: Bus to aggregate.
:param bus_injections: Bus-grouped injection lookup.
:param t_idx: Optional profile index.
:param logger: Export logger.
:return: Filled node aggregate.
"""
del circuit
aggregate: UcteBusAggregate = UcteBusAggregate()
if bus.get_active_at(t_idx):
aggregate.status = 0
else:
aggregate.status = 1
if bus.is_slack:
aggregate.node_type = 3
else:
aggregate.node_type = 0
aggregate.voltage_reference_kv = get_bus_voltage_reference(bus)
injection_lists_by_type: Dict[object, List[object]] | None = bus_injections.get(bus, None)
if injection_lists_by_type is None:
return aggregate
else:
pass
device_list: List[object]
for device_list in injection_lists_by_type.values():
device_obj: object
for device_obj in device_list:
if isinstance(device_obj, dev.Load):
if device_obj.get_active_at(t_idx):
add_load_like_device(aggregate=aggregate,
device=device_obj,
t_idx=t_idx,
logger=logger)
else:
pass
elif isinstance(device_obj, StaticGenerator):
if device_obj.get_active_at(t_idx):
add_generator_like_device(aggregate=aggregate,
device=device_obj,
t_idx=t_idx)
else:
pass
elif isinstance(device_obj, dev.Generator):
if device_obj.get_active_at(t_idx):
add_generator_like_device(aggregate=aggregate,
device=device_obj,
t_idx=t_idx)
update_generation_limits_from_generator(aggregate=aggregate, generator=device_obj)
update_aggregate_voltage_control_from_generator(aggregate=aggregate,
bus=bus,
generator=device_obj,
t_idx=t_idx)
else:
pass
elif isinstance(device_obj, dev.Battery):
if device_obj.get_active_at(t_idx):
add_generator_like_device(aggregate=aggregate,
device=device_obj,
t_idx=t_idx)
update_generation_limits_from_generator(aggregate=aggregate, generator=device_obj)
update_aggregate_voltage_control_from_generator(aggregate=aggregate,
bus=bus,
generator=device_obj,
t_idx=t_idx)
else:
pass
elif isinstance(device_obj, dev.Shunt):
if device_obj.get_active_at(t_idx):
aggregate.shunt_b_mvar += float(device_obj.get_B_at(t_idx))
if abs(float(device_obj.get_G_at(t_idx))) > 1e-9:
logger.add_warning("UCTE shunt export ignores conductance terms",
device=device_obj.name,
value=device_obj.get_G_at(t_idx))
else:
pass
else:
pass
elif isinstance(device_obj, ExternalGrid):
if device_obj.get_active_at(t_idx):
update_aggregate_from_external_grid(aggregate=aggregate,
bus=bus,
external_grid=device_obj,
t_idx=t_idx,
logger=logger)
else:
pass
else:
logger.add_warning("Unsupported injection device skipped by UCTE export",
device_class=str(type(device_obj)),
device=str(device_obj))
return aggregate
[docs]
def format_optional_float(value: float, decimals: int) -> str:
"""
Format one optional float for tolerant UCTE text output.
:param value: Numeric value.
:param decimals: Number of decimal places.
:return: Formatted text or an empty string.
"""
if math.isfinite(value):
return f"{value:.{decimals}f}"
else:
return ""
[docs]
def format_optional_float_width(value: float, width: int, decimals: int) -> str:
"""
Format one optional float using one strict fixed-width UCTE field.
:param value: Numeric value.
:param width: Field width.
:param decimals: Number of decimal places.
:return: Fixed-width text or blanks.
"""
if math.isfinite(value):
return f"{value:>{width}.{decimals}f}"
else:
return " " * width
[docs]
def format_node_row(node_code: str,
bus: dev.Bus,
aggregate: UcteBusAggregate) -> str:
"""
Format one UCTE node record.
:param node_code: Exported node code.
:param bus: Source bus.
:param aggregate: Aggregated node state.
:return: Node row text.
"""
geo_name: str = str(bus.name).strip()[:12]
active_gen_text: str
reactive_gen_text: str
min_gen_mw_text: str
max_gen_mw_text: str
min_gen_mvar_text: str
max_gen_mvar_text: str
if aggregate.has_generation_record:
active_gen_text = format_optional_float_width(aggregate.active_gen_mw, 7, 2)
reactive_gen_text = format_optional_float_width(aggregate.reactive_gen_mvar, 7, 2)
else:
active_gen_text = " " * 7
reactive_gen_text = " " * 7
if aggregate.has_generation_limits:
min_gen_mw_text = format_optional_float_width(aggregate.min_gen_mw, 7, 2)
max_gen_mw_text = format_optional_float_width(aggregate.max_gen_mw, 7, 2)
min_gen_mvar_text = format_optional_float_width(aggregate.min_gen_mvar, 7, 2)
max_gen_mvar_text = format_optional_float_width(aggregate.max_gen_mvar, 7, 2)
else:
min_gen_mw_text = " " * 7
max_gen_mw_text = " " * 7
min_gen_mvar_text = " " * 7
max_gen_mvar_text = " " * 7
# The node parser first tries a strict fixed-width layout. The exporter
# therefore writes the canonical field positions explicitly so all numeric
# columns survive a round-trip without being shifted by whitespace.
row_text: str = (
f"{node_code:<8} "
f"{geo_name:<12} "
f"{aggregate.status:d} "
f"{aggregate.node_type:d} "
f"{format_optional_float_width(aggregate.voltage_reference_kv, 6, 2)} "
f"{aggregate.active_load_mw:>7.2f} "
f"{aggregate.reactive_load_mvar:>7.2f} "
f"{active_gen_text} "
f"{reactive_gen_text} "
f"{min_gen_mw_text} "
f"{max_gen_mw_text} "
f"{min_gen_mvar_text} "
f"{max_gen_mvar_text} "
f"{'':>5} "
f"{'':>7} "
f"{'':>7} "
f"{'':>7} "
f"{aggregate.plant_type[:1]}"
)
return row_text.rstrip()
[docs]
def get_line_current_limit_a(line: dev.Line, vnom_kv: float, t_idx: int | None) -> float:
"""
Convert one VeraGrid line rate to a UCTE current limit.
:param line: VeraGrid line.
:param vnom_kv: Nominal voltage in kV.
:param t_idx: Optional profile index.
:return: Current limit in A or ``nan``.
"""
rate_mva: float = float(line.get_rate_at(t_idx))
if rate_mva > 0.0 and vnom_kv > 0.0:
return (rate_mva / (math.sqrt(3.0) * vnom_kv)) * 1000.0
else:
return math.nan
[docs]
def get_transformer_current_limit_a(transformer: dev.Transformer2W, t_idx: int | None) -> float:
"""
Convert one VeraGrid transformer rate to a UCTE current limit.
:param transformer: VeraGrid transformer.
:param t_idx: Optional profile index.
:return: Current limit in A or ``nan``.
"""
rated_voltage_kv: float = max(float(transformer.HV), float(transformer.LV))
rate_mva: float = float(transformer.get_rate_at(t_idx))
if rate_mva > 0.0 and rated_voltage_kv > 0.0:
return (rate_mva / (math.sqrt(3.0) * rated_voltage_kv)) * 1000.0
else:
return math.nan
[docs]
def get_line_ucte_impedance(line: dev.Line, circuit: MultiCircuit) -> tuple[float, float, float]:
"""
Convert one VeraGrid line impedance to UCTE units.
:param line: VeraGrid line.
:param circuit: Circuit base values.
:return: Resistance in ohm, reactance in ohm, susceptance in uS.
"""
base_voltage_kv: float = float(line.bus_from.Vnom)
base_impedance_ohm: float = (base_voltage_kv * base_voltage_kv) / float(circuit.Sbase)
resistance_ohm: float = float(line.R) * base_impedance_ohm
reactance_ohm: float = float(line.X) * base_impedance_ohm
susceptance_us: float = (float(line.B) / base_impedance_ohm) * 1e6
return resistance_ohm, reactance_ohm, susceptance_us
[docs]
def get_transformer_ucte_impedance(transformer: dev.Transformer2W,
circuit: MultiCircuit,
rated_voltage_1_kv: float) -> tuple[float, float, float, float]:
"""
Convert one VeraGrid transformer impedance to UCTE units.
:param transformer: VeraGrid transformer.
:param circuit: Circuit base values.
:param rated_voltage_1_kv: UCTE rated voltage on node1 side.
:return: Resistance in ohm, reactance in ohm, susceptance in uS, conductance in uS.
"""
base_impedance_ohm: float = (rated_voltage_1_kv * rated_voltage_1_kv) / float(circuit.Sbase)
resistance_ohm: float = float(transformer.R) * base_impedance_ohm
reactance_ohm: float = float(transformer.X) * base_impedance_ohm
susceptance_us: float = (float(transformer.B) / base_impedance_ohm) * 1e6
conductance_us: float = (float(transformer.G) / base_impedance_ohm) * 1e6
return resistance_ohm, reactance_ohm, susceptance_us, conductance_us
[docs]
def format_line_row(node_1: str,
node_2: str,
order_code: str,
status: int,
resistance_ohm: float,
reactance_ohm: float,
susceptance_us: float,
current_limit_a: float,
name: str) -> str:
"""
Format one UCTE line record.
:param node_1: First node code.
:param node_2: Second node code.
:param order_code: One-character order code.
:param status: UCTE status code.
:param resistance_ohm: Series resistance in ohm.
:param reactance_ohm: Series reactance in ohm.
:param susceptance_us: Total shunt susceptance in uS.
:param current_limit_a: Current limit in A.
:param name: Device name.
:return: Line row text.
"""
return (
f"{node_1:<8} "
f"{node_2:<8} "
f"{order_code[:1]} "
f"{status:d} "
f"{format_optional_float_width(resistance_ohm, 6, 4)} "
f"{format_optional_float_width(reactance_ohm, 6, 4)} "
f"{format_optional_float_width(susceptance_us, 8, 4)} "
f"{format_optional_float_width(current_limit_a, 6, 0)} "
f"{str(name).strip()}"
).rstrip()
[docs]
def format_transformer_row(node_1: str,
node_2: str,
order_code: str,
status: int,
rated_voltage_1_kv: float,
rated_voltage_2_kv: float,
nominal_power_mva: float,
resistance_ohm: float,
reactance_ohm: float,
susceptance_us: float,
conductance_us: float,
current_limit_a: float,
name: str) -> str:
"""
Format one UCTE transformer record.
:param node_1: Non-regulated winding node code.
:param node_2: Regulated winding node code.
:param order_code: One-character order code.
:param status: UCTE status code.
:param rated_voltage_1_kv: Rated voltage on node1 side.
:param rated_voltage_2_kv: Rated voltage on node2 side.
:param nominal_power_mva: Nominal power in MVA.
:param resistance_ohm: Series resistance in ohm.
:param reactance_ohm: Series reactance in ohm.
:param susceptance_us: Total shunt susceptance in uS.
:param conductance_us: Total shunt conductance in uS.
:param current_limit_a: Current limit in A.
:param name: Device name.
:return: Transformer row text.
"""
return (
f"{node_1:<8} {node_2:<8} {order_code} {status} "
f"{rated_voltage_1_kv:.1f} {rated_voltage_2_kv:.1f} {nominal_power_mva:.1f} "
f"{resistance_ohm:.6f} {reactance_ohm:.6f} {susceptance_us:.5f} "
f"{format_optional_float(conductance_us, 4)} {format_optional_float(current_limit_a, 0)} "
f"{str(name).strip()}"
).rstrip()
[docs]
def write_comment_block(circuit: MultiCircuit,
t_idx: int | None,
file_pointer: TextIO) -> None:
"""
Write the UCTE comment block.
:param circuit: Circuit to export.
:param t_idx: Optional profile index.
:param file_pointer: Open output file.
:return: None.
"""
snapshot_datetime: datetime = get_snapshot_datetime(circuit=circuit, t_idx=t_idx)
file_pointer.write(f"##C {snapshot_datetime.strftime('%Y.%m.%d')}\n")
file_pointer.write(f"{str(circuit.name).strip()}\n")
[docs]
def write_node_blocks_from_aggregates(circuit: MultiCircuit,
bus_code_map: Dict[dev.Bus, str],
aggregate_by_bus: Dict[dev.Bus, UcteBusAggregate],
file_pointer: TextIO) -> None:
"""
Write the UCTE node sections grouped by country block.
:param circuit: Circuit to export.
:param bus_code_map: Bus-to-node-code mapping.
:param aggregate_by_bus: Precomputed bus aggregates.
:param file_pointer: Open output file.
:return: None.
"""
buses_by_country: Dict[str, List[dev.Bus]] = dict()
bus: dev.Bus
for bus in circuit.get_buses():
country_code: str = get_country_code(bus)
bucket: List[dev.Bus] | None = buses_by_country.get(country_code, None)
if bucket is None:
buses_by_country[country_code] = [bus]
else:
bucket.append(bus)
country_code: str
for country_code in sorted(buses_by_country.keys()):
file_pointer.write(f"##Z{country_code}\n")
for bus in buses_by_country[country_code]:
aggregate: UcteBusAggregate = aggregate_by_bus[bus]
node_row: str = format_node_row(node_code=bus_code_map[bus],
bus=bus,
aggregate=aggregate)
file_pointer.write(node_row + "\n")
[docs]
def write_line_block(circuit: MultiCircuit,
bus_code_map: Dict[dev.Bus, str],
aggregate_by_bus: Dict[dev.Bus, UcteBusAggregate],
t_idx: int | None,
logger: Logger,
file_pointer: TextIO) -> None:
"""
Write the UCTE line block, including switch couplers and fictitious shunts.
:param circuit: Circuit to export.
:param bus_code_map: Bus-to-node-code mapping.
:param aggregate_by_bus: Precomputed bus aggregates.
:param t_idx: Optional profile index.
:param logger: Export logger.
:param file_pointer: Open output file.
:return: None.
"""
file_pointer.write("##L\n")
pair_counter: UcteBranchCounter = UcteBranchCounter()
line: dev.Line
for line in circuit.get_lines():
if line.bus_from is None or line.bus_to is None:
logger.add_warning("Skipping disconnected line in UCTE export", device=line.name)
else:
if line.bus_from.is_dc or line.bus_to.is_dc:
logger.add_warning("Skipping DC line in UCTE export", device=line.name)
else:
if abs(float(line.bus_from.Vnom) - float(line.bus_to.Vnom)) <= 1e-6:
node_1: str = bus_code_map[line.bus_from]
node_2: str = bus_code_map[line.bus_to]
order_code: str = pair_counter.get_next_code(node_1=node_1, node_2=node_2, logger=logger)
status: int
if line.get_active_at(t_idx):
status = 0
else:
status = 8
resistance_ohm: float
reactance_ohm: float
susceptance_us: float
resistance_ohm, reactance_ohm, susceptance_us = get_line_ucte_impedance(line=line,
circuit=circuit)
current_limit_a: float = get_line_current_limit_a(line=line,
vnom_kv=float(line.bus_from.Vnom),
t_idx=t_idx)
line_row: str = format_line_row(node_1=node_1,
node_2=node_2,
order_code=order_code,
status=status,
resistance_ohm=resistance_ohm,
reactance_ohm=reactance_ohm,
susceptance_us=susceptance_us,
current_limit_a=current_limit_a,
name=line.name)
file_pointer.write(line_row + "\n")
else:
logger.add_warning("Skipping cross-voltage line in UCTE export",
device=line.name,
value=f"{line.bus_from.Vnom} != {line.bus_to.Vnom}")
switch_obj: dev.Switch
for switch_obj in circuit.get_switches():
if switch_obj.bus_from is None or switch_obj.bus_to is None:
logger.add_warning("Skipping disconnected switch in UCTE export", device=switch_obj.name)
else:
if switch_obj.bus_from.is_dc or switch_obj.bus_to.is_dc:
logger.add_warning("Skipping DC switch in UCTE export", device=switch_obj.name)
else:
if abs(float(switch_obj.bus_from.Vnom) - float(switch_obj.bus_to.Vnom)) <= 1e-6:
node_1 = bus_code_map[switch_obj.bus_from]
node_2 = bus_code_map[switch_obj.bus_to]
order_code = pair_counter.get_next_code(node_1=node_1, node_2=node_2, logger=logger)
if switch_obj.get_active_at(t_idx):
status = 2
else:
status = 7
current_limit_a: float
if float(switch_obj.rated_current) > 0.0:
current_limit_a = float(switch_obj.rated_current) * 1000.0
else:
current_limit_a = math.nan
switch_row: str = format_line_row(node_1=node_1,
node_2=node_2,
order_code=order_code,
status=status,
resistance_ohm=0.0,
reactance_ohm=0.0,
susceptance_us=0.0,
current_limit_a=current_limit_a,
name=switch_obj.name)
file_pointer.write(switch_row + "\n")
else:
logger.add_warning("Skipping cross-voltage switch in UCTE export",
device=switch_obj.name,
value=f"{switch_obj.bus_from.Vnom} != {switch_obj.bus_to.Vnom}")
bus: dev.Bus
for bus in circuit.get_buses():
aggregate: UcteBusAggregate = aggregate_by_bus[bus]
if abs(aggregate.shunt_b_mvar) > 1e-9:
if bus.Vnom > 0.0:
shunt_node_code: str = build_shunt_node_code(bus_code_map[bus])
order_code = pair_counter.get_next_code(node_1=bus_code_map[bus], node_2=shunt_node_code, logger=logger)
susceptance_us = (aggregate.shunt_b_mvar / (float(bus.Vnom) * float(bus.Vnom))) * 1e6
shunt_row: str = format_line_row(node_1=bus_code_map[bus],
node_2=shunt_node_code,
order_code=order_code,
status=0,
resistance_ohm=0.0,
reactance_ohm=0.05,
susceptance_us=susceptance_us,
current_limit_a=math.nan,
name=f"{bus.name} fict. shunt")
file_pointer.write(shunt_row + "\n")
else:
logger.add_warning("Skipping bus shunt in UCTE export because the bus nominal voltage is invalid",
device=bus.name,
value=bus.Vnom)
else:
pass
[docs]
def write_transformer_block(circuit: MultiCircuit,
bus_code_map: Dict[dev.Bus, str],
t_idx: int | None,
logger: Logger,
file_pointer: TextIO) -> None:
"""
Write the UCTE transformer block.
:param circuit: Circuit to export.
:param bus_code_map: Bus-to-node-code mapping.
:param t_idx: Optional profile index.
:param logger: Export logger.
:param file_pointer: Open output file.
:return: None.
"""
file_pointer.write("##T\n")
pair_counter: UcteBranchCounter = UcteBranchCounter()
transformer: dev.Transformer2W
for transformer in circuit.get_transformers2w():
if transformer.bus_from is None or transformer.bus_to is None:
logger.add_warning("Skipping disconnected transformer in UCTE export", device=transformer.name)
else:
if transformer.bus_from.is_dc or transformer.bus_to.is_dc:
logger.add_warning("Skipping DC transformer in UCTE export", device=transformer.name)
else:
legacy_orientation: bool = use_legacy_transformer_orientation(bus_from=transformer.bus_from,
bus_to=transformer.bus_to)
node_1: str
node_2: str
if legacy_orientation:
# Standard country-coded UCTE files map node1 to the
# non-regulated winding and node2 to the regulated one.
node_1 = bus_code_map[transformer.bus_to]
node_2 = bus_code_map[transformer.bus_from]
else:
# IEEE-like synthetic UCTE data is interpreted directly as
# node1 -> node2 by the importer, so preserve that order.
node_1 = bus_code_map[transformer.bus_from]
node_2 = bus_code_map[transformer.bus_to]
order_code: str = pair_counter.get_next_code(node_1=node_1, node_2=node_2, logger=logger)
status: int
if transformer.get_active_at(t_idx):
status = 0
else:
status = 8
# Without explicit UCTE regulation rows, fixed taps are carried
# by rated_voltage1 in the same way the importer already
# expects.
tap_module: float = float(transformer.get_tap_module_at(t_idx))
rated_voltage_1_kv: float
if legacy_orientation:
if abs(tap_module) > 1e-9:
rated_voltage_1_kv = float(transformer.bus_to.Vnom) / tap_module
else:
rated_voltage_1_kv = float(transformer.bus_to.Vnom)
else:
rated_voltage_1_kv = float(transformer.bus_from.Vnom) * tap_module
rated_voltage_2_kv: float
if legacy_orientation:
rated_voltage_2_kv = float(transformer.bus_from.Vnom)
else:
rated_voltage_2_kv = float(transformer.bus_to.Vnom)
if abs(float(transformer.get_tap_phase_at(t_idx))) > 1e-9:
logger.add_warning("UCTE export is ignoring transformer phase shifting because no regulation rows are emitted yet",
device=transformer.name,
value=transformer.get_tap_phase_at(t_idx))
else:
pass
if transformer.tap_module_control_mode != TapModuleControl.fixed:
logger.add_warning("UCTE export is flattening transformer tap control to a fixed rated-voltage ratio",
device=transformer.name,
value=str(transformer.tap_module_control_mode))
else:
pass
if transformer.tap_phase_control_mode != TapPhaseControl.fixed:
logger.add_warning("UCTE export is flattening transformer phase control to the fixed transformer state",
device=transformer.name,
value=str(transformer.tap_phase_control_mode))
else:
pass
resistance_ohm: float
reactance_ohm: float
susceptance_us: float
conductance_us: float
resistance_ohm, reactance_ohm, susceptance_us, conductance_us = get_transformer_ucte_impedance(
transformer=transformer,
circuit=circuit,
rated_voltage_1_kv=rated_voltage_1_kv,
)
nominal_power_mva: float
if float(transformer.Sn) > 0.0:
nominal_power_mva = float(transformer.Sn)
elif float(transformer.get_rate_at(t_idx)) > 0.0:
nominal_power_mva = float(transformer.get_rate_at(t_idx))
else:
nominal_power_mva = 100.0
current_limit_a: float = get_transformer_current_limit_a(transformer=transformer, t_idx=t_idx)
transformer_row: str = format_transformer_row(node_1=node_1,
node_2=node_2,
order_code=order_code,
status=status,
rated_voltage_1_kv=rated_voltage_1_kv,
rated_voltage_2_kv=rated_voltage_2_kv,
nominal_power_mva=nominal_power_mva,
resistance_ohm=resistance_ohm,
reactance_ohm=reactance_ohm,
susceptance_us=susceptance_us,
conductance_us=conductance_us,
current_limit_a=current_limit_a,
name=transformer.name)
file_pointer.write(transformer_row + "\n")
[docs]
def write_empty_regulation_blocks(file_pointer: TextIO) -> None:
"""
Write the optional UCTE blocks that are not populated yet.
:param file_pointer: Open output file.
:return: None.
"""
file_pointer.write("##R\n")
file_pointer.write("##TT\n")
file_pointer.write("##E\n")
[docs]
def write_ucte(file_name: str,
circuit: MultiCircuit,
t_idx: int | None = None,
logger: Logger | None = None) -> Logger:
"""
Write one VeraGrid circuit as one UCTE text file.
:param file_name: Target file path.
:param circuit: Circuit to export.
:param t_idx: Optional profile index. ``None`` means snapshot export.
:param logger: Optional logger.
:return: Logger with export messages.
"""
export_logger: Logger
if logger is None:
export_logger = Logger()
else:
export_logger = logger
bus_code_map: Dict[dev.Bus, str] = build_bus_node_code_map(circuit=circuit, logger=export_logger)
bus_injections: Dict[dev.Bus, Dict[object, List[object]]] = circuit.get_injection_devices_grouped_by_bus()
aggregate_by_bus: Dict[dev.Bus, UcteBusAggregate] = dict()
bus: dev.Bus
for bus in circuit.get_buses():
aggregate_by_bus[bus] = build_bus_aggregate(circuit=circuit,
bus=bus,
bus_injections=bus_injections,
t_idx=t_idx,
logger=export_logger)
with open(file_name, "w", encoding="utf-8") as file_pointer:
write_comment_block(circuit=circuit, t_idx=t_idx, file_pointer=file_pointer)
write_node_blocks_from_aggregates(circuit=circuit,
bus_code_map=bus_code_map,
aggregate_by_bus=aggregate_by_bus,
file_pointer=file_pointer)
write_line_block(circuit=circuit,
bus_code_map=bus_code_map,
aggregate_by_bus=aggregate_by_bus,
t_idx=t_idx,
logger=export_logger,
file_pointer=file_pointer)
write_transformer_block(circuit=circuit,
bus_code_map=bus_code_map,
t_idx=t_idx,
logger=export_logger,
file_pointer=file_pointer)
write_empty_regulation_blocks(file_pointer=file_pointer)
return export_logger