# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
import numpy as np
from typing import Tuple, Sequence, TYPE_CHECKING
from scipy.sparse.linalg import factorized, spsolve
from scipy.sparse import csc_matrix, bmat
import networkx as nx
import VeraGridEngine.Devices as dev
from VeraGridEngine.basic_structures import IntVec, CxVec, Logger
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
if TYPE_CHECKING:
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Simulations.PowerFlow.power_flow_results import PowerFlowResults
from VeraGridEngine.DataStructures.numerical_circuit import NumericalCircuit
[docs]
def ward_reduction_non_linear(nc: NumericalCircuit, e_buses, b_buses, i_buses, voltage: CxVec, Sbus: CxVec):
"""
:param nc:
:param e_buses:
:param b_buses:
:param i_buses:
:param voltage:
:param Sbus:
:return:
"""
adm = nc.get_admittance_matrices()
Ybus = adm.Ybus
# slice matrices
YII = Ybus[np.ix_(i_buses, i_buses)]
YIB = Ybus[np.ix_(i_buses, b_buses)]
YBI = Ybus[np.ix_(b_buses, i_buses)]
YBB = Ybus[np.ix_(b_buses, b_buses)]
YBE = Ybus[np.ix_(b_buses, e_buses)]
YEB = Ybus[np.ix_(e_buses, b_buses)]
YEE = Ybus[np.ix_(e_buses, e_buses)]
YEE_fact = factorized(YEE.tocsc())
Yeq = - YBE @ YEE_fact(YEB.toarray()) # 2.16
YBBp = YBB + csc_matrix(Yeq) # YBBp = YBB - YBE @ YEE_fact(YEB) # 2.13
# compute the current
I = (Sbus / nc.Sbase) / np.conj(voltage)
II = I[i_buses]
IB = I[b_buses]
IE = I[e_buses]
# these are the currents to be added ar every bus that remains
Ieq = - YBE @ YEE_fact(IE) # 2.17
# now, we compute the new voltages at the internal and boundary (2.15)
Ysys = bmat(blocks=[[YII.tocsc(), YIB.tocsc()], [YBI.tocsc(), YBBp.tocsc()]], format='csc')
IBp = IB + Ieq # IBp = IB - YBE @ YEE_fact(IE) # 2.14
Isys = np.r_[II, IBp]
V = spsolve(Ysys, Isys)
ni = len(i_buses)
VI = V[:ni]
VB = V[ni:]
Seq = VB * np.conj(IB)
return Seq, Ieq, Yeq, YBBp, adm.Ybus.tocsc()
[docs]
def ward_reduction_linear(Ybus: csc_matrix, e_buses: IntVec, b_buses: IntVec, i_buses: IntVec):
"""
:param Ybus:
:param e_buses:
:param b_buses:
:param i_buses:
:return:
"""
# slice matrices
Yii = Ybus[np.ix_(i_buses, i_buses)]
Yib = Ybus[np.ix_(i_buses, b_buses)]
Yie = Ybus[np.ix_(i_buses, e_buses)]
Ybi = Ybus[np.ix_(b_buses, i_buses)]
Ybb = Ybus[np.ix_(b_buses, b_buses)]
Ybe = Ybus[np.ix_(b_buses, e_buses)]
Yeb = Ybus[np.ix_(e_buses, b_buses)]
Yee = Ybus[np.ix_(e_buses, e_buses)]
Yee_fact = factorized(Yee.tocsc())
Yeq = - Ybe @ Yee_fact(Yeb.toarray()) # 2.16
Ybbp = Ybb + csc_matrix(Yeq) # YBBp = YBB - YBE @ YEE_fact(YEB) # 2.13
return Yeq, Ybbp.tocsc()
# # compute the current
# P = nc.get_power_injections_pu().real
#
# Pi = P[i_buses]
# Pb = P[b_buses]
# Pe = P[e_buses]
#
# # these are the power to be added ar every boundary bus
# Pb_eq = - Bbe @ Bee_fact(Pe) # 2.17
#
# # ΞP = βB_ie B_eeβ»ΒΉ P_e
# Pe = -Bie @ Bee_fact(Pe)
#
# return Pb_eq, Pb_eq, Beq, Bbbp, adm.Bbus.tocsc()
[docs]
def ward_reduction_linear2(Ybus: csc_matrix, e_buses: IntVec, i_buses: IntVec):
"""
:param Ybus:
:param e_buses:
:param i_buses:
:return:
"""
# slice matrices
Yii = Ybus[np.ix_(i_buses, i_buses)]
Yie = Ybus[np.ix_(i_buses, e_buses)]
Yei = Ybus[np.ix_(e_buses, i_buses)]
Yee = Ybus[np.ix_(e_buses, e_buses)]
Yee_fact = factorized(Yee.tocsc())
Yeq = - Yie @ Yee_fact(Yei.toarray()) # 2.16
# Ybbp = Yii + csc_matrix(Yeq) # YBBp = YBB - YBE @ YEE_fact(YEB) # 2.13
return Yeq.tocsc()
[docs]
def get_reduction_sets_1(nc: NumericalCircuit,
reduction_bus_indices: Sequence[int]) -> Tuple[IntVec, IntVec, IntVec, IntVec]:
"""
Generate the set of bus indices for grid reduction
:param nc: NumericalCircuit
:param reduction_bus_indices: array of bus indices to reduce (external set)
:return: external, boundary, internal, boundary_branches
"""
external_set = set(reduction_bus_indices)
boundary_set = set()
internal_set = set()
boundary_branches = list()
for k in range(nc.nbr):
f = nc.passive_branch_data.F[k]
t = nc.passive_branch_data.T[k]
if f in external_set:
if t in external_set:
# the branch belongs to the external set
pass
else:
# the branch is a boundary link and t is a frontier bus
boundary_set.add(t)
boundary_branches.append(k)
else:
# we know f is not external...
if t in external_set:
# f is not in the external set, but t is: the branch is a boundary link and f is a frontier bus
boundary_set.add(f)
boundary_branches.append(k)
else:
# f nor t are in the external set: both belong to the internal set
internal_set.add(f)
internal_set.add(t)
# buses cannot be in both the internal and boundary set
elms_to_remove = list()
for i in internal_set:
if i in boundary_set:
elms_to_remove.append(i)
for i in elms_to_remove:
internal_set.remove(i)
# convert to arrays and sort
external = np.sort(np.array(list(external_set)))
boundary = np.sort(np.array(list(boundary_set)))
internal = np.sort(np.array(list(internal_set)))
boundary_branches = np.array(boundary_branches)
return external, boundary, internal, boundary_branches
[docs]
def create_new_boundary_branches(grid: MultiCircuit, b_buses: IntVec, Yeq_1: csc_matrix, Ybbp_1: csc_matrix,
tol: float, use_linear=False):
"""
:param grid:
:param b_buses:
:param Yeq_1:
:param Ybbp_1:
:param tol:
:param use_linear:
:return:
"""
# add boundary equivalent sub-grid: traverse only the triangular
max_x = 5.0
for i in range(len(b_buses)):
for j in range(i):
if abs(Yeq_1[i, j]) > tol:
if i == j:
# add shunt reactance
bus = b_buses[i]
yeq_row_i = Yeq_1[i, :].copy()
yeq_row_i[i] = 0
ysh = Ybbp_1[i, i] - np.sum(yeq_row_i)
if use_linear:
sh = dev.Shunt(name=f"Equivalent shunt {i}", B=ysh, G=0.0)
else:
sh = dev.Shunt(name=f"Equivalent shunt {i}", B=ysh.imag, G=ysh.real)
grid.add_shunt(bus=bus, api_obj=sh)
else:
# add series reactance
f = b_buses[i]
t = b_buses[j]
z = - 1.0 / Yeq_1[i, j]
if z.imag <= max_x:
series_reactance = dev.SeriesReactance(
name=f"Equivalent boundary impedance {b_buses[i]}-{b_buses[j]}",
bus_from=grid.buses[f],
bus_to=grid.buses[t],
r=z.real,
x=z.imag
)
grid.add_series_reactance(series_reactance)
[docs]
def find_gen_relocation(grid: MultiCircuit,
reduction_bus_indices: Sequence[int]):
"""
Relocate generators
:param grid: MultiCircuit
:param reduction_bus_indices: array of bus indices to reduce (external set)
:return: None
"""
G = nx.Graph()
bus_idx_dict = grid.get_bus_index_dict()
external_set = set(reduction_bus_indices)
external_gen_set = set()
external_gen_data = list()
internal_set = set()
# loop through the generators in the external set
for k, elm in enumerate(grid.generators):
i = bus_idx_dict[elm.bus]
if i in external_set:
external_set.remove(i)
external_gen_set.add(i)
external_gen_data.append((k, i, 'generator'))
G.add_node(i)
# loop through the branches
for branch in grid.get_branches(add_vsc=False, add_hvdc=False, add_switch=True):
f = bus_idx_dict[branch.bus_from]
t = bus_idx_dict[branch.bus_to]
if f in external_set or t in external_set:
# the branch belongs to the external set
pass
else:
# f nor t are in the external set: both belong to the internal set
internal_set.add(f)
internal_set.add(t)
G.add_node(f)
G.add_node(t)
w = branch.get_weight()
G.add_edge(f, t, weight=w)
# convert to arrays and sort
# external = np.sort(np.array(list(external_set)))
# purely_internal_set = np.sort(np.array(list(purely_internal_set)))
purely_internal_set = list(internal_set - external_gen_set)
# now, for every generator, we need to find the shortest path in the "purely internal set"
for elm_idx, bus_idx, tpe in external_gen_data:
# Compute shortest path lengths from this source
lengths = nx.single_source_shortest_path_length(G, bus_idx)
# Filter only target nodes
target_distances = {t: lengths[t] for t in purely_internal_set if t in lengths}
if target_distances:
# Pick the closest
closest = min(target_distances, key=target_distances.get)
# relocate
if tpe == 'generator':
grid.generators[elm_idx].bus = grid.buses[closest]
[docs]
def di_shi_reduction(grid: MultiCircuit,
reduction_bus_indices: IntVec,
V0: CxVec,
tol=1e-8) -> Tuple[MultiCircuit, Logger]:
"""
In-place Grid reduction using the Di-Shi equivalent model
from: Power System Network Reduction for Engineering and Economic Analysis by Di Shi, 2012
and Optimal Generation Investment Planning: Pt 1: Network Equivalents
:param grid: MultiCircuit
:param reduction_bus_indices: Bus indices of the buses to delete
:param V0: Array of voltages for the non-reduced system
:param tol: Tolerance, any equivalent power value under this is omitted
"""
logger = Logger()
nc = compile_numerical_circuit_at(grid, t_idx=None)
# Step 1 β First Ward reduction ------------------------------------------------------------------------------------
# This first reduction is to obtain the equivalent admittance matrix Y_eq1
# that serves to create the inter-boundary branches that represent the grid
# that we are going to remove.
# For this the buses to keep are the internal (I) + boundary (B).
# find the boundary set: buses from the internal set the join to the external set
e_buses, b_buses, i_buses, b_branches = get_reduction_sets_1(nc=nc, reduction_bus_indices=reduction_bus_indices)
if len(e_buses) == 0:
logger.add_info(msg="Nothing to reduce")
return logger
if len(i_buses) == 0:
logger.add_info(msg="Nothing to keep (null grid as a result)")
return logger
if len(b_buses) == 0:
logger.add_info(msg="The reducible and non reducible sets are disjoint and cannot be reduced")
return logger
adm = nc.get_admittance_matrices()
Yeq_1, Ybbp_1 = ward_reduction_linear(Ybus=adm.Ybus, e_buses=e_buses, b_buses=b_buses, i_buses=i_buses)
create_new_boundary_branches(grid=grid, b_buses=b_buses, Yeq_1=Yeq_1, Ybbp_1=Ybbp_1, tol=tol, use_linear=False)
# Step 2 β Second Ward reduction: Extending to the external generation buses ---------------------------------------
# The second reduction is to generate another equivalent admittance matrix Y_eq2
# that we use as adjacency matrix to search the closest bus to move each generator
# that is external.
# For this the buses to keep are the internal (I) + boundary (B) + the generation buses of E.
find_gen_relocation(grid=grid, reduction_bus_indices=reduction_bus_indices)
# remove the buses of the external system finally
# Delete the external buses
grid.delete_buses(lst=[grid.buses[e] for e in e_buses], delete_associated=True)
# Step 3 β Relocate generators -------------------------------------------------------------------------------------
# Using the matrix Y_eq2, we calculate the shortest paths from every external
# generation bus, to all the other buses in I + B. The end of each path will
# be the relocation bus of every external generator.
# This is done in the step before: In the original Di-Shi reduction method,
# they use a second ward reduction to compute the distances, however that
# can be done directly
# Step 4 β Relocate loads with inverse power flow ------------------------------------------------------------------
# Let's not forget about the loads! in order to move the external loads such that
# the reduced flows resemble the original flows (even after brutally moving the generators!),
# we need to perform an inverse power flow.
#
# First, we need to run a linear power flow in the original system.
# That will get us the original voltage angles.
#
# Second, we need to form the admittance matrix of the reduced grid
# (including the inter-boundary branches), and multiply this admittance matrix
# by the original voltage angles for the reduced set of buses.
# This gets us the "final" power injections in the reduced system.
#
# From those, we need to subtract the reduced grid injections.
# This will provide us with a vector of new loads that we need
# to add at the corresponding reduced grid buses in order to have a final equivalent.
nc2 = compile_numerical_circuit_at(grid, t_idx=None)
adm2 = nc2.get_admittance_matrices()
red_bus_idx = np.sort(np.r_[i_buses, b_buses])
Vred = V0[red_bus_idx]
S_expected = (Vred * np.conj(adm2.Ybus @ Vred)) * grid.Sbase
Sred_current = grid.get_Sbus()
dSred = Sred_current - S_expected
# add loads
for i, S in enumerate(dSred):
if abs(S) > tol:
ld = dev.Load(name=f"compensation {i}", P=S.real, Q=S.imag)
grid.add_load(bus=grid.buses[i], api_obj=ld)
return grid, logger
# if __name__ == '__main__':
# import VeraGridEngine as gce
#
# fname = '/home/santi/Documentos/Git/GitHub/VeraGrid/src/tests/data/grids/Matpower/case9.m'
# fname_expected = '/home/santi/Documentos/Git/GitHub/VeraGrid/src/tests/data/grids/Matpower/ieee9_reduced.m'
#
# reduction_bus_indices_ = np.array([2, 4, 5, 6])
#
# grid_ = gce.open_file(fname)
# grid_expected = gce.open_file(fname_expected)
#
# grid_red, logger_ = di_shi_reduction(grid=grid_.copy(),
# reduction_bus_indices=reduction_bus_indices_,
# V0=gce.power_flow(grid_).voltage,
# tol=1e-8)
#
# logger_.print()