# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
from typing import Dict, List, TYPE_CHECKING
import random
import numpy as np
import VeraGridEngine.Devices as dev
from VeraGridEngine.enumerations import ProceduralGridMethods
from VeraGridEngine.basic_structures import Mat, Vec, Logger
from scipy.spatial.distance import cdist
from scipy.sparse.csgraph import minimum_spanning_tree
from scipy.special import gammaincinv
from VeraGridEngine.Utils.GeographicalMethods.haversine_distance import haversine_distance
from collections import deque
from typing import Optional
from VeraGridEngine.Topology.Procedural.procedural_grid_debugger import ProceduralGridDebugger
from VeraGridEngine.Simulations.PowerFlow.power_flow_options import PowerFlowOptions
from VeraGridEngine.Simulations.CatalogueOptimization.catalogue_optimization_driver import (
CatalogueOptimizationDriver,
CatalogueOptimizationOptions,
)
from VeraGridEngine.Simulations.CatalogueOptimization.Problems.catalogue_problem import (
CatalogueOptimizationProblem,
)
if TYPE_CHECKING:
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
[docs]
def coord_calc(current_bus_lon: float, current_bus_lat: float, length: float, coord_out: Vec) -> Vec:
"""
Calculate the coordinates of the next bus based on the current bus,
the length to be covered (in km), and the output coordinates (in degrees).
The bearing must be computed in physical km space (not raw degree space) so
that intermediate buses lie on the geographic straight line between the two
endpoints. Using raw degree differences would distort the direction because
1Β° of longitude is shorter in km than 1Β° of latitude at non-equatorial latitudes.
:param current_bus_lon: Longitude of the current bus in degrees.
:param current_bus_lat: Latitude of the current bus in degrees.
:param length: Distance to advance toward ``coord_out`` in km.
:param coord_out: Target coordinates as ``[longitude, latitude]`` in degrees.
:return: New coordinates as ``[longitude, latitude]`` in degrees.
"""
km_per_deg_lat: float = 111.0
km_per_deg_lon: float = 111.0 * np.cos(np.radians(current_bus_lat))
# Convert the degree-space displacement to physical km so the bearing is
# computed in a (locally) isotropic coordinate frame
physical_dx: float = (coord_out[0] - current_bus_lon) * km_per_deg_lon
physical_dy: float = (coord_out[1] - current_bus_lat) * km_per_deg_lat
line_angle: float = np.arctan2(physical_dy, physical_dx)
delta_lon: float = np.cos(line_angle) * length / km_per_deg_lon
delta_lat: float = np.sin(line_angle) * length / km_per_deg_lat
new_coord: Vec = np.array([current_bus_lon + delta_lon, current_bus_lat + delta_lat])
return new_coord
[docs]
def instantiate_branch_from_template(template_branch,
current_bus: dev.Bus,
next_bus: dev.Bus,
length: float):
"""
Create a new branch object using an existing branch as template.
"""
if isinstance(template_branch, dev.Line):
new_branch = dev.Line(
name=f"AC_Line_{current_bus.name}_{next_bus.name}",
bus_from=current_bus,
bus_to=next_bus,
length=length,
r=template_branch.R,
x=template_branch.X,
b=template_branch.B,
rate=template_branch.rate,
capex=template_branch.capex,
opex=template_branch.opex
)
return new_branch
if isinstance(template_branch, dev.Transformer2W):
new_branch = dev.Transformer2W(
name=f"TR_{current_bus.name}_{next_bus.name}",
bus_from=current_bus,
bus_to=next_bus,
HV=max(current_bus.Vnom, next_bus.Vnom),
LV=min(current_bus.Vnom, next_bus.Vnom),
r=template_branch.R,
x=template_branch.X,
rate=template_branch.rate,
capex=template_branch.capex,
opex=template_branch.opex
)
new_branch.Pcu = template_branch.Pcu
new_branch.Pfe = template_branch.Pfe
return new_branch
return None
[docs]
class TransitionMatrix:
def __init__(self, grid: MultiCircuit):
# Find out my unique voltages
voltages = set()
for branch in grid.get_branches(add_vsc=True, add_hvdc=True, add_switch=True):
V1 = branch.bus_from.Vnom
V2 = branch.bus_to.Vnom
voltages.add(V1)
voltages.add(V2)
self.voltages_sorted = sorted(voltages)
self.voltages_dict = {v: i for i, v in enumerate(self.voltages_sorted)}
self.transition_matrix = np.zeros((len(self.voltages_sorted), len(self.voltages_sorted)))
for branch in grid.get_branches(add_vsc=True, add_hvdc=True, add_switch=True):
V1 = branch.bus_from.Vnom
V2 = branch.bus_to.Vnom
i1 = self.voltages_dict[V1]
i2 = self.voltages_dict[V2]
self.transition_matrix[i1, i2] += 1.0
self.transition_matrix[i2, i1] += 1.0
# normalize
for i in range(self.transition_matrix.shape[0]):
self.transition_matrix[i, :] /= self.transition_matrix[i, :].sum()
# template dictionary
self.template_dict = self.template_dictionary(grid)
[docs]
def at(self, V1: float, V2: float):
"""
Get probability associated to V1, transitioning to V2
:param V1: Voltage Source
:param V2: Voltage target
:return: Probability of transition
"""
i1 = self.voltages_dict[V1]
i2 = self.voltages_dict[V2]
return self.transition_matrix[i1, i2]
[docs]
@staticmethod
def template_dictionary(grid: MultiCircuit) -> Dict[float, Dict[float, List[tuple[object, float]]]]:
"""
Build a nested dictionary of branch templates grouped by voltage transition.
The outer key is the lower voltage (kV) and the inner key is the higher voltage
(kV), so (132 kV β 220 kV) is stored as template_dict[132.0][220.0].
Both voltages are sorted so that (132, 220) and (220, 132) map to the same entry.
The values are lists of (branch_object, probability) tuples where probability
is assigned uniformly among all branches that share the same voltage transition.
:param grid: MultiCircuit instance
:return: Nested dict {v_lo: {v_hi: [(branch_object, probability), ...]}}
"""
grouped_elements: Dict[float, Dict[float, List[object]]] = dict()
for branch in grid.get_branches(add_vsc=True, add_hvdc=True, add_switch=True):
v1 = float(branch.bus_from.Vnom)
v2 = float(branch.bus_to.Vnom)
v_lo = min(v1, v2)
v_hi = max(v1, v2)
if v_lo not in grouped_elements:
grouped_elements[v_lo] = dict()
if v_hi not in grouped_elements[v_lo]:
grouped_elements[v_lo][v_hi] = list()
grouped_elements[v_lo][v_hi].append(branch)
template_dict: Dict[float, Dict[float, List[tuple[object, float]]]] = dict()
for v_lo, inner in grouped_elements.items():
template_dict[v_lo] = dict()
for v_hi, branch_objects in inner.items():
n_el = len(branch_objects)
if n_el == 0:
template_dict[v_lo][v_hi] = list()
continue
probability = 1.0 / n_el
template_dict[v_lo][v_hi] = [(branch_object, probability) for branch_object in branch_objects]
return template_dict
[docs]
def get_most_likely_transition_voltage(self, V: float):
"""
Get the most likely voltage to transition to, given a voltage
:param V: Some voltage source
:return: The most likely voltage target
"""
i1 = self.voltages_dict[V]
i2 = np.argmax(self.transition_matrix[i1, :])
return self.voltages_sorted[i2]
[docs]
class ProceduralGridGraph:
def __init__(self,
target_buses: List[dev.Bus],
candidate_buses: List[dev.Bus],
max_iterations: int = 1000,
logger: Logger | None = None):
"""
:param target_buses:
:param candidate_buses:
:param max_iterations: Maximum number of iterations
"""
self.target_buses: List[dev.Bus] = target_buses
self.candidate_buses: List[dev.Bus] = candidate_buses
self.max_iterations: int = max_iterations
self.logger: Logger = Logger() if logger is None else logger
self.n_target = len(self.target_buses)
self.n_candidate = len(self.candidate_buses)
self.n_steiner = self.n_target + self.n_candidate - 2
# 1. Parse inputs
self.coord_candidate = np.zeros((self.n_candidate, 2)) # dim, (lon, lat)
self.coord_target = np.zeros((self.n_target, 2)) # dim, (lon, lat)
Vnom_kV_candidate = np.zeros(self.n_candidate)
Vnom_kV_target = np.zeros(self.n_target)
self.min_lon = np.inf
self.min_lat = np.inf
self.max_lon = -np.inf
self.max_lat = -np.inf
for i, bus in enumerate(self.target_buses):
self.coord_target[i, 0] = bus.longitude
self.coord_target[i, 1] = bus.latitude
Vnom_kV_target[i] = bus.Vnom
self.min_lon = min(self.min_lon, bus.longitude)
self.min_lat = min(self.min_lat, bus.latitude)
self.max_lon = max(self.max_lon, bus.longitude)
self.max_lat = max(self.max_lat, bus.latitude)
for i, bus in enumerate(self.candidate_buses):
self.coord_candidate[i, 0] = bus.longitude
self.coord_candidate[i, 1] = bus.latitude
Vnom_kV_candidate[i] = bus.Vnom
self.min_lon = min(self.min_lon, bus.longitude)
self.min_lat = min(self.min_lat, bus.latitude)
self.max_lon = max(self.max_lon, bus.longitude)
self.max_lat = max(self.max_lat, bus.latitude)
self.base_coords = np.r_[self.coord_candidate, self.coord_target]
self.base_voltages = np.r_[Vnom_kV_candidate, Vnom_kV_target]
[docs]
def calculate_fitness(self, mu_lon: Vec, mu_lat: Vec):
"""
VSA Fitness: MST Length + Degree Penalty.
:param mu_lat: steiner points latitudes
:param mu_lon: steiner points longitudes
"""
steiner_coords = np.c_[mu_lon, mu_lat]
all_coords = np.r_[self.base_coords, steiner_coords]
# MST Calculation
dist_matrix = cdist(all_coords, all_coords)
mst_matrix = minimum_spanning_tree(dist_matrix)
mst_array = mst_matrix.toarray()
total_length = np.sum(mst_array)
# Penalty for Degree > 3 (Geometrically impossible for optimal ST)
adj = mst_array + mst_array.T
degrees = np.count_nonzero(adj, axis=1)
steiner_degrees = degrees[self.n_target + self.n_candidate:]
penalty = 0
for d in steiner_degrees:
if d > 3: penalty += 10000
return total_length + penalty
[docs]
def run_vsa(self):
"""
:return: best_solution (dim, (lat, lon))
"""
dim = self.n_steiner
mu_lon = np.random.uniform(self.min_lon, self.max_lon, dim)
mu_lat = np.random.uniform(self.min_lat, self.max_lat, dim)
best_fitness = self.calculate_fitness(mu_lon=mu_lon, mu_lat=mu_lat)
best_solution = np.c_[mu_lon, mu_lat]
convergence_history = []
radius_max = np.sqrt((self.max_lat - self.min_lat) ** 2 + (self.max_lon - self.min_lon) ** 2) / 2.0
x_param = 0.1
for k in range(self.max_iterations):
a_t = 1.0 - (k / self.max_iterations)
a_t = max(a_t, 0.0001)
r_t = radius_max * (1 / x_param) * gammaincinv(x_param, a_t)
candidate_lon = mu_lon + r_t * np.random.randn(dim)
candidate_lat = mu_lat + r_t * np.random.randn(dim)
candidate_lon = np.clip(candidate_lon, self.min_lon, self.max_lon)
candidate_lat = np.clip(candidate_lat, self.min_lat, self.max_lat)
fitness = self.calculate_fitness(mu_lon=candidate_lon, mu_lat=candidate_lat)
if fitness < best_fitness:
best_fitness = fitness
mu_lon = candidate_lon
mu_lat = candidate_lat
best_solution = np.c_[candidate_lon, candidate_lat]
convergence_history.append(best_fitness)
return best_solution, convergence_history
[docs]
def prune_redundant_nodes(self, steiner_coords: Mat):
"""
Iteratively removes Steiner points with Degree <= 2.
:param steiner_coords: steiner points coordinates (dim, (lon, lat))
"""
current_steiner = steiner_coords.copy()
while True:
if len(current_steiner) == 0:
break
# 1. Build MST with current set
all_coords = np.r_[self.base_coords, current_steiner]
dist_matrix = cdist(all_coords, all_coords, metric='euclidean')
mst_array = minimum_spanning_tree(dist_matrix).toarray()
# cost = np.sum(mst_array)
# 2. Check degrees
adj = mst_array + mst_array.T
degrees = np.count_nonzero(adj, axis=1)
# Extract Steiner degrees (indices after fixed nodes)
st_degrees = degrees[self.n_target + self.n_candidate:]
# 3. Identify useful points (Degree >= 3)
useful_indices = np.where(st_degrees > 2)[0]
# If no nodes are redundant, stop pruning
if len(useful_indices) == len(current_steiner):
break
self.logger.add_info(f"Pruning... Removed {len(current_steiner) - len(useful_indices)} nodes.")
current_steiner = current_steiner[useful_indices, :]
return current_steiner
[docs]
def remove_existing_edges(
self,
mst_matrix: np.ndarray,
existing_connections: set[tuple[int, int]],
n_fixed: int,
) -> np.ndarray:
"""
Remove MST edges that already exist in the incumbent grid.
This method is graph-only: it does not know anything about VeraGrid objects.
It only receives:
- the MST matrix,
- the set of existing fixed-node connections,
- the number of fixed nodes (candidate + target).
Only edges between fixed nodes are checked. Edges involving Steiner nodes
are always kept.
:param mst_matrix: MST adjacency matrix
:param existing_connections: Set of existing graph edges as index pairs
:param n_fixed: Number of fixed nodes at the beginning of the node ordering
:return: Filtered MST adjacency matrix
"""
cleaned_mst = mst_matrix.copy()
rows, cols = np.where(cleaned_mst > 0)
for i, j in zip(rows, cols):
if i >= n_fixed or j >= n_fixed:
continue
key = (min(i, j), max(i, j))
if key in existing_connections:
cleaned_mst[i, j] = 0.0
return cleaned_mst
[docs]
class Topology:
"""
Represents the physical layout using domain objects.
Contains a Graph instance via composition.
"""
def __init__(self,
edges: list[tuple[int, int]],
all_buses: List[dev.Bus],
grid: MultiCircuit,
transition_matrix: TransitionMatrix,
discretization: float = 25.0,
logger: Logger | None = None):
self.edges = edges
self.all_buses = all_buses
self.grid = grid
self.transition_matrix = transition_matrix
self.discretization = discretization # km between buses
self.logger: Logger = Logger() if logger is None else logger
self.intermediate_buses: List[dev.Bus] = list() # buses generated in the markov process
self.new_lines: List[dev.Line] = list() # lines created during topology generation
self.new_transformers: List[dev.Transformer2W] = list() # transformers created during topology generation
[docs]
def generate_markov(self):
"""
Generates combinations that are valid by construction.
"""
for edge in self.edges:
lengths = []
input_bus = self.all_buses[edge[0]]
output_bus = self.all_buses[edge[1]]
distance = haversine_distance(
lat1=input_bus.latitude,
lon1=input_bus.longitude,
lat2=output_bus.latitude,
lon2=output_bus.longitude
) # distance = np.linalg.norm(coord_input - coord_output)
if distance % self.discretization == 0:
n_buses = int(distance // self.discretization + 1)
n_slots = int(n_buses - 1)
for i in range(n_slots):
lengths.append(self.discretization)
else:
n_buses = int(distance // self.discretization + 2)
n_slots = int(n_buses - 1)
for i in range(n_slots):
if i == n_slots - 1:
lengths.append(distance % self.discretization)
else:
lengths.append(self.discretization)
accumulated_length = 0.0
conn_counter = 0
intermediate_bus_counter = 0
# Add the first bus to the Multicircuit
next_bus = input_bus
voltage_options = self.transition_matrix.voltages_sorted
while accumulated_length < distance:
# Update next bus to current bus
current_bus = next_bus
# Check if we are at the last connection
is_last_conn = (conn_counter == n_slots - 1)
# Add next bus to Multicircuit
if is_last_conn:
next_bus = output_bus
if current_bus.Vnom != next_bus.Vnom:
# Any voltage change at the last slot: use last_bus_fix so that a
# line covers the geographic distance and the transformer is placed
# at the endpoint, regardless of whether a direct template exists.
self.logger.add_info(
msg=f"Voltage change from {current_bus.Vnom}kV to {next_bus.Vnom}kV. "
f"Creating voltage bridge near the output bus.",
device=f"edge {edge}"
)
next_bus, intermediate_bus_counter = self.last_bus_fix(
current_bus=current_bus,
output_bus=output_bus,
edge=edge,
intermediate_bus_counter=intermediate_bus_counter,
)
else:
# Same voltage: direct line connection, no bridge needed
None
else:
next_bus_options = sorted([b for b in voltage_options])
row_idx = self.transition_matrix.voltages_dict[current_bus.Vnom]
current_transitions = self.transition_matrix.transition_matrix[row_idx, :]
# random.choices returns a list, so get the first element [0]
next_bus_vnom = random.choices(next_bus_options, weights=current_transitions, k=1)[0]
next_coord = coord_calc(
current_bus_lon=current_bus.longitude,
current_bus_lat=current_bus.latitude,
length=lengths[conn_counter],
coord_out=np.array([output_bus.longitude, output_bus.latitude]),
)
next_bus = dev.Bus(
name=f"Intermediate_{input_bus.name}_{output_bus.name}_{intermediate_bus_counter}",
Vnom=float(next_bus_vnom),
is_dc=current_bus.is_dc,
longitude=float(next_coord[0]),
latitude=float(next_coord[1]),
)
next_bus.Vm_cost = next_bus.Vnom * next_bus.Vm_cost
self.grid.add_bus(obj=next_bus)
self.intermediate_buses.append(next_bus)
intermediate_bus_counter += 1
v1 = float(current_bus.Vnom)
v2 = float(next_bus.Vnom)
v_lo = min(v1, v2)
v_hi = max(v1, v2)
template_options = self.transition_matrix.template_dict.get(v_lo, dict()).get(v_hi, list())
if not template_options:
self.logger.add_warning(
msg=f"No template branches for transition {v_lo}kV-{v_hi}kV. Skipping edge {edge}.",
device=f"edge {edge}"
)
break
template_branches = [item[0] for item in template_options]
template_probs = [item[1] for item in template_options]
template_branch = random.choices(template_branches, weights=template_probs, k=1)[0]
new_branch = instantiate_branch_from_template(
template_branch=template_branch,
current_bus=current_bus,
next_bus=next_bus,
length=lengths[conn_counter],
)
self.logger.add_info(
msg=f"Branch type={type(new_branch).__name__}, "
f"accumulated_length={accumulated_length:.1f}, distance={distance:.1f}, "
f"conn_counter={conn_counter}",
device=f"edge {edge}"
)
if new_branch is None:
self.logger.add_warning(
msg=f"Could not instantiate branch for transition {v_lo}kV-{v_hi}kV. Skipping edge {edge}.",
device=f"edge {edge}"
)
break
self.add_branch_to_grid(branch=new_branch)
if isinstance(new_branch, dev.Line):
accumulated_length += lengths[conn_counter]
conn_counter += 1
# TODO: Add the merge_lines function outside generate_markov and inside Topology
# combination, length_elements_ = merge_lines(combination, length_elements_)
return self.intermediate_buses
[docs]
def add_branch_to_grid(self, branch) -> None:
"""
Add a branch object to the correct container of the expansion grid.
"""
if isinstance(branch, dev.Line):
self.grid.add_line(obj=branch)
self.new_lines.append(branch)
return
if isinstance(branch, dev.Transformer2W):
self.grid.add_transformer2w(obj=branch)
self.new_transformers.append(branch)
return
# TODO: Extend later for VSC, HVDC, switches, etc.
[docs]
def find_voltage_path(self, start_v: float, end_v: float) -> Optional[list[float]]:
"""
Find a voltage path between start_v and end_v using the transitions
available in template_dict.
:param start_v: Initial voltage
:param end_v: Final voltage
:return: List of voltages [start_v, ..., end_v] or None if no path exists
"""
start_v = float(start_v)
end_v = float(end_v)
if start_v == end_v:
return [start_v]
adjacency: dict[float, set[float]] = dict()
for v_lo, inner in self.transition_matrix.template_dict.items():
for v_hi, template_options in inner.items():
if len(template_options) == 0:
continue
if v_lo not in adjacency:
adjacency[v_lo] = set()
if v_hi not in adjacency:
adjacency[v_hi] = set()
adjacency[v_lo].add(v_hi)
adjacency[v_hi].add(v_lo)
if start_v not in adjacency or end_v not in adjacency:
return None
queue = deque([start_v])
parent: dict[float, Optional[float]] = {start_v: None}
while queue:
v = queue.popleft()
if v == end_v:
break
for neigh in adjacency[v]:
if neigh not in parent:
parent[neigh] = v
queue.append(neigh)
if end_v not in parent:
return None
path: list[float] = list()
v = end_v
while v is not None:
path.append(v)
v = parent[v]
path.reverse()
return path
[docs]
def last_bus_fix(
self,
current_bus: dev.Bus,
output_bus: dev.Bus,
edge: tuple[int, int],
intermediate_bus_counter: int,
) -> tuple[dev.Bus, int]:
"""
Create a voltage bridge near the output bus so that the last geographical
segment can still be connected and the final output voltage can be reached.
Strategy:
- create a first bridge bus at the output coordinates with the same voltage
as current_bus,
- connect current_bus to that first bridge bus later using the normal last
line section,
- create the remaining zero-length voltage transition chain from that first
bridge bus to output_bus.
:param current_bus: Current bus at the start of the last slot
:param output_bus: Final target bus
:param edge: Edge being processed
:param intermediate_bus_counter: Counter for naming intermediate buses
:return: (first_bridge_bus, updated_intermediate_bus_counter)
"""
voltage_path = self.find_voltage_path(
start_v=float(current_bus.Vnom),
end_v=float(output_bus.Vnom),
)
if voltage_path is None:
raise RuntimeError(
f"No voltage path exists from {current_bus.Vnom} kV to {output_bus.Vnom} kV."
)
if len(voltage_path) < 2:
raise RuntimeError(
f"Invalid voltage path from {current_bus.Vnom} kV to {output_bus.Vnom} kV: {voltage_path}"
)
# First bridge bus: same voltage as current_bus, but placed at output coordinates.
first_bridge_bus = dev.Bus(
name=f"Intermediate_{current_bus.name}_{output_bus.name}_{intermediate_bus_counter}",
Vnom=float(current_bus.Vnom),
is_dc=current_bus.is_dc,
longitude=float(output_bus.longitude),
latitude=float(output_bus.latitude),
)
first_bridge_bus.Vm_cost = first_bridge_bus.Vnom * first_bridge_bus.Vm_cost
self.grid.add_bus(obj=first_bridge_bus)
self.intermediate_buses.append(first_bridge_bus)
intermediate_bus_counter += 1
previous_bus = first_bridge_bus
# Build the voltage-transition chain near the output bus.
# voltage_path is [current_v, ..., output_v]
for idx, next_voltage in enumerate(voltage_path[1:], start=1):
is_last_step = (idx == len(voltage_path) - 1)
if is_last_step:
next_bus = output_bus
else:
next_bus = dev.Bus(
name=f"Intermediate_{current_bus.name}_{output_bus.name}_{intermediate_bus_counter}",
Vnom=float(next_voltage),
is_dc=current_bus.is_dc,
longitude=float(output_bus.longitude),
latitude=float(output_bus.latitude),
)
next_bus.Vm_cost = next_bus.Vnom * next_bus.Vm_cost
self.grid.add_bus(obj=next_bus)
self.intermediate_buses.append(next_bus)
intermediate_bus_counter += 1
v1 = float(previous_bus.Vnom)
v2 = float(next_bus.Vnom)
v_lo = min(v1, v2)
v_hi = max(v1, v2)
template_options = self.transition_matrix.template_dict.get(v_lo, dict()).get(v_hi, list())
if len(template_options) == 0:
raise RuntimeError(
f"No template branches available for transition {v_lo}kV-{v_hi}kV "
f"while fixing edge {edge}."
)
template_branches = [item[0] for item in template_options]
template_probs = [item[1] for item in template_options]
template_branch = random.choices(template_branches, weights=template_probs, k=1)[0]
new_branch = instantiate_branch_from_template(
template_branch=template_branch,
current_bus=previous_bus,
next_bus=next_bus,
length=0.0,
)
if new_branch is None:
raise RuntimeError(
f"Could not instantiate branch for transition {v_lo}kV-{v_hi}kV "
f"while fixing edge {edge}."
)
self.add_branch_to_grid(branch=new_branch)
previous_bus = next_bus
return first_bridge_bus, intermediate_bus_counter
[docs]
class ProceduralGridComputationEngine:
"""
Core engine for procedural grid expansion calculations.
"""
def __init__(self,
grid: MultiCircuit,
method: ProceduralGridMethods,
targets: List[dev.Substation],
candidates: List[dev.Substation],
logger: Logger | None = None):
"""
:param grid: MultiCircuit instance
:param method: Method to use
:param targets: List of substations to connect
:param candidates: List of Substation that we may want to connect
"""
self.grid = grid
self.method = method
self.targets_substations = targets
self.candidates_substations = candidates
self.logger: Logger = Logger() if logger is None else logger
self.target_buses: List[dev.Bus] = list()
self.candidate_buses: List[dev.Bus] = list()
self.steiner_buses: List[dev.Bus] = list() # this is a result
self.intermediate_buses: List[dev.Bus] = list() # buses generated in the markov process
self.new_lines: List[dev.Line] = list() # lines created during topology generation
self.new_transformers: List[dev.Transformer2W] = list() # transformers created during topology generation
self.debugger = ProceduralGridDebugger(enabled=True)
candidate_counter = 0
for sg in self.candidates_substations:
for bus in self.grid.get_substation_buses(sg):
lon, lat = bus.try_to_find_coordinates()
bus.longitude = lon
bus.latitude = lat
bus.Vm_cost = bus.Vnom * bus.Vm_cost
self.candidate_buses.append(bus)
candidate_counter += 1
target_counter = 0
for sg in self.targets_substations:
for bus in self.grid.get_substation_buses(sg):
lon, lat = bus.try_to_find_coordinates()
bus.longitude = lon
bus.latitude = lat
bus.Vm_cost = bus.Vnom * bus.Vm_cost
self.target_buses.append(bus)
target_counter += 1
# TRANSITION_MATRIX = {
# 132: {132: 0.7, 220: 0.15, 380: 0.15, 500: 0},
# 220: {132: 0.15, 220: 0.7, 380: 0.1, 500: 0.05},
# 380: {132: 0.15, 220: 0.1, 380: 0.7, 500: 0.05},
# 500: {132: 0, 220: 0.15, 380: 0.15, 500: 0.7}
# }
# TRANSITION_MATRIX = {
# 132: {132: {template_A: 0.35, template_B: 0.35}, 220: {template_A: 0.075, template_B: 0.075}, 380: {template_A: 0.075, template_B: 0.075}, 500: {template_A: 0.00, template_B: 0.00}},
# 220: {132: {template_A: 0.075, template_B: 0.075}, 220: {template_A: 0.35, template_B: 0.35}, 380: {template_A: 0.05, template_B: 0.05}, 500: {template_A: 0.025, template_B: 0.025}},
# 380: {132: {template_A: 0.075, template_B: 0.075}, 220: {template_A: 0.05, template_B: 0.05}, 380: {template_A: 0.35, template_B: 0.35}, 500: {template_A: 0.025, template_B: 0.025}},
# 500: {132: {template_A: 0.00, template_B: 0.00}, 220: {template_A: 0.075, template_B: 0.075}, 380: {template_A: 0.075, template_B: 0.075}, 500: {template_A: 0.35, template_B: 0.35}},
# }
self.transition_matrix = TransitionMatrix(grid)
[docs]
def get_buses(self) -> List[dev.Bus]:
"""
Get list of all the incumbent buses in the calculation
:return:
"""
return self.target_buses + self.candidate_buses + self.steiner_buses + self.intermediate_buses
[docs]
def get_new_buses(self) -> List[dev.Bus]:
"""
Get only the newly created buses (Steiner points + Markov intermediate buses).
These have no substation assigned and need one for map rendering.
:return:
"""
return self.steiner_buses + self.intermediate_buses
[docs]
def get_new_lines(self) -> List[dev.Line]:
"""
Get only the newly created Line objects added during topology generation.
:return:
"""
return self.new_lines
[docs]
def run_steiner_alone(self):
"""
Executes the Steiner Tree algorithm without further optimization.
"""
# 2A. Run VSA: Create an initial radial topology with a Steiner tree approach
network = ProceduralGridGraph(target_buses=self.target_buses,
candidate_buses=self.candidate_buses,
max_iterations=15000,
logger=self.logger)
initial_steiner_points, history = network.run_vsa()
# 2B. Prune redundant nodes
final_steiner_pts = network.prune_redundant_nodes(initial_steiner_points)
# 2C. Get edges of the final network
coords_final_network = np.r_[network.base_coords, final_steiner_pts]
dist_matrix = cdist(coords_final_network, coords_final_network, metric='euclidean')
final_mst = minimum_spanning_tree(dist_matrix).toarray()
existing_connections = self.get_existing_fixed_bus_connections()
n_fixed = len(self.candidate_buses) + len(self.target_buses)
final_mst = network.remove_existing_edges(
mst_matrix=final_mst,
existing_connections=existing_connections,
n_fixed=n_fixed,
)
# generate edges
rows, cols = np.where(final_mst > 0)
edges = list(zip(rows, cols))
# 2E. Assign voltages to Steiner Points
distances = np.zeros(network.base_coords.shape[0])
closest_voltage = np.zeros(final_steiner_pts.shape[0])
for i_sp in range(final_steiner_pts.shape[0]):
for i_base in range(network.base_coords.shape[0]):
dist = haversine_distance(
lat1=network.base_coords[i_base, 1],
lon1=network.base_coords[i_base, 0],
lat2=final_steiner_pts[i_sp, 1],
lon2=final_steiner_pts[i_sp, 0],
)
distances[i_base] = dist
closest = np.argmin(distances)
closest_voltage[i_sp] = network.base_voltages[closest]
# 2F. Create Node buses (M, N, S)
for i in range(final_steiner_pts.shape[0]):
sp_bus = dev.Bus(
name=f"Intermediate_{i}",
Vnom=float(closest_voltage[i]),
longitude=final_steiner_pts[i, 0],
latitude=final_steiner_pts[i, 1],
)
# Remove list if you do not need it anymore
self.steiner_buses.append(sp_bus)
self.grid.add_bus(obj=sp_bus)
all_buses = self.candidate_buses + self.target_buses + self.steiner_buses
# 3. Create Topology from Graph
# TRANSITION_MATRIX = {
# 132: {132: 0.7, 220: 0.15, 380: 0.15, 500: 0},
# 220: {132: 0.15, 220: 0.7, 380: 0.1, 500: 0.05},
# 380: {132: 0.15, 220: 0.1, 380: 0.7, 500: 0.05},
# 500: {132: 0, 220: 0.15, 380: 0.15, 500: 0.7}
# }
topology = Topology(edges,
all_buses,
self.grid,
transition_matrix=self.transition_matrix,
discretization=25.0,
logger=self.logger)
before_names = self.debugger.snapshot_grid_element_names(self.grid)
self.intermediate_buses = topology.generate_markov()
self.new_lines = topology.new_lines
self.new_transformers = topology.new_transformers
added_names = self.debugger.get_added_element_names(
grid=self.grid,
previous_names=before_names,
)
for name in added_names:
self.logger.add_info(msg="Added element", device=name)
return self.grid
[docs]
def get_existing_fixed_bus_connections(self) -> set[tuple[int, int]]:
"""
Build the set of existing direct connections between fixed buses
(candidate + target) in graph-index form.
The returned index pairs are expressed in the same ordering used by
ProceduralGridGraph.base_coords:
[candidate_buses..., target_buses...]
:return: Set of existing fixed-bus connections as index pairs
"""
fixed_buses = self.candidate_buses + self.target_buses
bus_index_by_id = {id(bus): i for i, bus in enumerate(fixed_buses)}
existing_connections: set[tuple[int, int]] = set()
for branch in self.grid.get_branches(add_vsc=True, add_hvdc=True, add_switch=True):
i = bus_index_by_id.get(id(branch.bus_from))
j = bus_index_by_id.get(id(branch.bus_to))
if i is None or j is None:
continue
key = (min(i, j), max(i, j))
existing_connections.add(key)
return existing_connections
[docs]
def run_steiner_tree_and_optimization(self,
pf_options: PowerFlowOptions,
max_eval_per_var: int) -> "MultiCircuit":
"""
Execute the Steiner tree topology generation first, then run an NSGA-3
catalogue-template optimization over the newly created branches.
The two-stage flow mirrors what the engine offers piecewise: first the
topology is grown via :meth:`run_steiner_alone`, which populates
``self.new_lines`` and ``self.new_transformers``; then a synchronous
catalogue optimization tunes the per-branch templates of those new
branches only (existing infrastructure is left untouched). On
completion the grid is left at the best Pareto member's templates so
the caller observes the optimizer's top recommendation.
:param pf_options: power flow options used by the inner PF evaluations
performed by the catalogue problem.
:param max_eval_per_var: per-decision-variable evaluation budget. The
actual NSGA-3 evaluation cap becomes
``max_eval_per_var * problem.n_vars()``.
:return: the (in-place mutated) :class:`MultiCircuit` reference.
"""
# 1. Build the topology. Mutates self.grid and populates
# self.new_lines / self.new_transformers, which are the candidates
# for catalogue optimisation in step 2.
self.run_steiner_alone()
# 2. Build the optimisation scope: only the newly created branches.
# Pre-existing infrastructure is intentionally left untouched.
selected_branches: List = list(self.new_lines) + list(self.new_transformers)
# 2a. If the Steiner step produced no new branches, there is nothing
# to optimise. Surface a warning to the logger and exit early.
if len(selected_branches) == 0:
self.logger.add_warning(
msg="No new branches were created by the Steiner step; "
"catalogue optimization skipped."
)
return self.grid
else:
pass
# 3. Build the catalogue problem. The constructor raises ValueError
# when no slot has more than one compatible template (nothing to
# optimise over). We catch and report, then return the topology
# as-is so the caller still gets a usable grid.
try:
problem: CatalogueOptimizationProblem = CatalogueOptimizationProblem(
grid=self.grid,
pf_options=pf_options,
selected_branches=selected_branches,
voltage_tolerance=0.1,
)
except ValueError as ex:
self.logger.add_warning(
msg=f"Catalogue optimization skipped: {ex}"
)
return self.grid
# 4. Compose options. max_eval is the per-var budget scaled by the
# number of decision slots, matching the standalone catalogue
# feature in simulations.py.
max_eval: int = int(max_eval_per_var) * problem.n_vars()
options: CatalogueOptimizationOptions = CatalogueOptimizationOptions(
max_eval=max_eval,
pf_options=pf_options,
)
# 5. Build and run the driver synchronously. We do NOT route through
# session.run here because this method is engine-side and must be
# callable without a running GUI session.
driver: CatalogueOptimizationDriver = CatalogueOptimizationDriver(
grid=self.grid,
options=options,
problem=problem,
)
driver.run()
# 6. Merge the driver's logger into the engine logger so its messages
# reach the GUI LogsDialogue together with the topology-phase logs.
self.logger += driver.logger
# 7. Apply the best Pareto member's templates so the returned grid
# reflects the optimizer's top recommendation. The driver leaves
# the grid at baseline after each evaluation (per the problem's
# snapshot/restore convention), so we restore-then-apply to land
# on a deterministic, optimiser-chosen state.
if len(driver.results.sorting_indices) > 0:
best_x = driver.results.x[driver.results.sorting_indices[0], :]
driver.problem._restore_baseline()
driver.problem._apply_combination(x=best_x)
else:
# No Pareto results available (e.g. budget too small) - leave the
# grid at baseline templates.
pass
return self.grid