# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
import numpy as np
from typing import Union, Dict, Tuple, TYPE_CHECKING
from VeraGridEngine.enumerations import SolverType
from VeraGridEngine.basic_structures import Logger, ConvergenceReport
from VeraGridEngine.Simulations.PowerFlow.power_flow_results_3ph import PowerFlowResults3Ph
from VeraGridEngine.Simulations.PowerFlow.power_flow_options import PowerFlowOptions
from VeraGridEngine.Simulations.PowerFlow.power_flow_results import NumericPowerFlowResults
from VeraGridEngine.Simulations.PowerFlow.Formulations.pf_basic_formulation_3ph import (PfBasicFormulation3Ph,
expand3ph,
expandVoltage3ph)
from VeraGridEngine.Simulations.PowerFlow.NumericalMethods.common_functions import (floating_star_currents,
floating_star_powers)
from VeraGridEngine.Simulations.PowerFlow.NumericalMethods.newton_raphson_fx import newton_raphson_fx
from VeraGridEngine.Simulations.PowerFlow.NumericalMethods.powell_fx import powell_fx
from VeraGridEngine.Simulations.PowerFlow.NumericalMethods.levenberg_marquadt_fx import levenberg_marquardt_fx
from VeraGridEngine.Topology.simulation_indices import SimulationIndices
from VeraGridEngine.DataStructures.numerical_circuit import NumericalCircuit
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.Devices.Substation.bus import Bus
from VeraGridEngine.Devices.Aggregation.area import Area
from VeraGridEngine.basic_structures import CxVec, Vec
if TYPE_CHECKING: # Only imports the below statements during type checking
from VeraGridEngine.Compilers.circuit_to_data import VALID_OPF_RESULTS
def __solve_island_limited_support_3ph(island: NumericalCircuit,
indices: SimulationIndices,
options: PowerFlowOptions,
V0: CxVec,
S_base: CxVec,
Shvdc: Vec,
logger=Logger()) -> Tuple[NumericPowerFlowResults, ConvergenceReport]:
"""
Run a power flow simulation using the selected method (no outer loop controls).
This routine supports delete voltage controls,and Hvdc links through external injections (Shvdc)
Also requires grids to be split by HvdcLines
:param island: SnapshotData circuit, this ensures on-demand admittances computation
:param indices: SimulationIndices
:param options: PowerFlow options
:param V0: Array of initial voltages
:param S_base: Array of power Injections
:param Shvdc: Array of power injections due t the HVDC lines (only used in some algorithms)
:param logger: Logger
:return: NumericPowerFlowResults
"""
logger.add_info('Using the complete support power flow method')
report = ConvergenceReport()
if options.retry_with_other_methods:
solver_list = [SolverType.NR,
SolverType.PowellDogLeg,
SolverType.LM]
if options.solver_type in solver_list:
solver_list.remove(options.solver_type)
solvers = [options.solver_type] + solver_list
else:
# No retry selected
solvers = [options.solver_type]
# set worked = false to enter the loop
solver_idx = 0
# set the initial value
Qmax, Qmin = island.get_reactive_power_limits()
S0: CxVec = Shvdc + S_base # already for 3-phase
if len(indices.vd) == 0:
solution = NumericPowerFlowResults(V=np.zeros(len(S0) * 3, dtype=complex),
Scalc=S0,
m=expand3ph(island.active_branch_data.tap_module),
tau=expand3ph(island.active_branch_data.tap_angle),
Sf=np.zeros(island.nbr * 3, dtype=complex),
St=np.zeros(island.nbr * 3, dtype=complex),
If=np.zeros(island.nbr * 3, dtype=complex),
It=np.zeros(island.nbr * 3, dtype=complex),
loading=np.zeros(island.nbr * 3, dtype=complex),
losses=np.zeros(island.nbr * 3, dtype=complex),
Pfp_vsc=np.zeros(island.nvsc, dtype=float),
Pfn_vsc=np.zeros(island.nvsc, dtype=float),
St_vsc=np.zeros(island.nvsc * 3, dtype=complex),
If_vsc=np.zeros(island.nvsc, dtype=float),
It_vsc=np.zeros(island.nvsc, dtype=complex),
losses_vsc=np.zeros(island.nvsc, dtype=float),
loading_vsc=np.zeros(island.nvsc, dtype=float),
Sf_hvdc=np.zeros(island.nhvdc, dtype=complex),
St_hvdc=np.zeros(island.nhvdc, dtype=complex),
losses_hvdc=np.zeros(island.nhvdc, dtype=complex),
loading_hvdc=np.zeros(island.nhvdc, dtype=complex),
converged=False,
norm_f=1e200,
iterations=0,
elapsed=0)
# method, converged: bool, error: float, elapsed: float, iterations: int
report.add(method=SolverType.NoSolver, converged=True, error=0.0, elapsed=0.0, iterations=0)
logger.add_error('Not solving power flow because there is no slack bus')
return solution, report
else:
final_solution = NumericPowerFlowResults(V=V0,
converged=False,
norm_f=1e200,
Scalc=S0,
m=expand3ph(island.active_branch_data.tap_module),
tau=expand3ph(island.active_branch_data.tap_angle),
Sf=np.zeros(island.nbr * 3, dtype=complex),
St=np.zeros(island.nbr * 3, dtype=complex),
If=np.zeros(island.nbr * 3, dtype=complex),
It=np.zeros(island.nbr * 3, dtype=complex),
loading=np.zeros(island.nbr * 3, dtype=complex),
losses=np.zeros(island.nbr * 3, dtype=complex),
Pfp_vsc=np.zeros(island.nvsc, dtype=float),
Pfn_vsc=np.zeros(island.nvsc, dtype=float),
St_vsc=np.zeros(island.nvsc * 3, dtype=complex),
If_vsc=np.zeros(island.nvsc, dtype=float),
It_vsc=np.zeros(island.nvsc * 3, dtype=complex),
losses_vsc=np.zeros(island.nvsc, dtype=float),
loading_vsc=np.zeros(island.nvsc, dtype=float),
Sf_hvdc=np.zeros(island.nhvdc, dtype=complex),
St_hvdc=np.zeros(island.nhvdc, dtype=complex),
losses_hvdc=np.zeros(island.nhvdc, dtype=complex),
loading_hvdc=np.zeros(island.nhvdc, dtype=complex),
iterations=0,
elapsed=0)
while solver_idx < len(solvers) and not final_solution.converged:
# get the solver
solver_type = solvers[solver_idx]
if solver_type == SolverType.LM:
problem = PfBasicFormulation3Ph(V0=final_solution.V,
S0=S0,
Qmin=Qmin,
Qmax=Qmax,
nc=island,
options=options,
logger=logger)
solution = levenberg_marquardt_fx(problem=problem,
tol=options.tolerance,
max_iter=options.max_iter,
verbose=options.verbose,
logger=logger)
elif solver_type == SolverType.NR:
problem = PfBasicFormulation3Ph(V0=final_solution.V,
S0=S0,
Qmin=Qmin,
Qmax=Qmax,
nc=island,
options=options,
logger=logger)
solution = newton_raphson_fx(problem=problem,
tol=options.tolerance,
max_iter=options.max_iter,
trust=options.trust_radius,
verbose=options.verbose,
logger=logger)
elif solver_type == SolverType.PowellDogLeg:
problem = PfBasicFormulation3Ph(V0=final_solution.V,
S0=S0,
Qmin=Qmin,
Qmax=Qmax,
nc=island,
options=options,
logger=logger)
solution = powell_fx(problem=problem,
tol=options.tolerance,
max_iter=options.max_iter,
trust=options.trust_radius,
verbose=options.verbose,
logger=logger)
else:
# for any other method, raise exception
raise Exception(solver_type.value + ' Not supported in power flow mode')
# record the solution type
solution.method = solver_type
# record the method used, if it improved the solution
if abs(solution.norm_f) < abs(final_solution.norm_f):
report.add(method=solver_type,
converged=solution.converged,
error=solution.norm_f,
elapsed=solution.elapsed,
iterations=solution.iterations)
if solution.method in [SolverType.Linear, SolverType.LACPF]:
# if the method is linear, we do not check the solution quality
final_solution = solution
else:
# if the method is supposed to be exact, we check the solution quality
if abs(solution.norm_f) < 0.1:
final_solution = solution
else:
logger.add_info('Tried solution is garbage',
solver_type.value,
value="{:.4e}".format(solution.norm_f),
expected_value=0.1)
else:
logger.add_info('Tried solver but it did not improve the solution',
solver_type.value,
value="{:.4e}".format(solution.norm_f),
expected_value=final_solution.norm_f)
# next solver
solver_idx += 1
if not final_solution.converged:
logger.add_error('Did not converge, even after retry!',
device='Error',
value="{:.4e}".format(final_solution.norm_f),
expected_value=f"<{options.tolerance}")
if final_solution.tap_module is None:
final_solution.tap_module = island.active_branch_data.tap_module
if final_solution.tap_angle is None:
final_solution.tap_angle = island.active_branch_data.tap_angle
return final_solution, report
def __multi_island_pf_nc_limited_support_3ph(nc: NumericalCircuit,
options: PowerFlowOptions,
logger: Logger | None = None,
V_guess: Union[CxVec, None] = None,
Sbus_input: Union[CxVec, None] = None) -> PowerFlowResults3Ph:
"""
Multiple islands power flow (this is the most generic power flow function)
multi_island_pf
|-> multi_island_pf_nc
|-> split_into_islands (Deals with HvdcLine injections)
|-> for each island:
|-> single_island_pf
|-> solve
:param nc: SnapshotData instance
:param options: PowerFlowOptions instance
:param logger: logger
:param V_guess: voltage guess
:param Sbus_input: Use this power injections if provided
:return: PowerFlowResults instance
"""
if logger is None:
logger = Logger()
# declare results
results = PowerFlowResults3Ph(
n=nc.nbus * 3,
m=nc.nbr * 3,
n_hvdc=nc.nhvdc,
n_vsc=nc.nvsc,
n_gen=nc.ngen,
n_batt=nc.nbatt,
n_sh=nc.nshunt,
n_load=nc.nload,
bus_names=nc.bus_data.get_3ph_names(),
branch_names=nc.passive_branch_data.get_3ph_names(),
hvdc_names=nc.hvdc_data.names,
vsc_names=nc.vsc_data.names,
gen_names=nc.generator_data.names,
batt_names=nc.battery_data.names,
sh_names=nc.shunt_data.names,
load_names=nc.load_data.names,
bus_types=nc.bus_data.bus_types,
)
# compose the HVDC power Injections
# since the power flow methods don't support HVDC directly, we need this step
Shvdc, Losses_hvdc, Pf_hvdc, Pt_hvdc, loading_hvdc, n_free = nc.hvdc_data.get_power(
Sbase=nc.Sbase,
theta=np.zeros(nc.nbus),
)
# compute islands
islands = nc.split_into_islands(ignore_single_node_islands=options.ignore_single_node_islands,
consider_hvdc_as_island_links=False,
logger=logger)
for i, island in enumerate(islands):
Sbus_base = island.get_power_injections_pu()
indices = island.get_simulation_indices(Sbus=Sbus_base)
if len(indices.vd) > 0:
V0 = island.bus_data.Vbus if V_guess is None else V_guess[island.bus_data.original_idx]
S_base = Sbus_base if Sbus_input is None else Sbus_input[island.bus_data.original_idx]
# call the numerical methods
solution, report = __solve_island_limited_support_3ph(
island=island,
indices=indices,
options=options,
V0=expandVoltage3ph(V0),
S_base=expand3ph(S_base),
Shvdc=expand3ph(Shvdc[island.bus_data.original_idx]),
logger=logger
)
# merge the results from this island
results.apply_from_island(
results=solution,
b_idx=island.bus_data.original_idx,
br_idx=island.passive_branch_data.original_idx,
hvdc_idx=island.hvdc_data.original_idx,
vsc_idx=island.vsc_data.original_idx
)
results.convergence_reports.append(report)
else:
logger.add_info('No slack nodes in the island', str(i))
# Compile HVDC results (available for the complete grid since HVDC line as
# formulated are split objects
# Pt is the "generation" at the sending point
results.Pf_hvdc = - Pf_hvdc * nc.Sbase # we change the sign to keep the sign convention with AC lines
results.Pt_hvdc = - Pt_hvdc * nc.Sbase # we change the sign to keep the sign convention with AC lines
results.loading_hvdc = loading_hvdc
results.losses_hvdc = Losses_hvdc * nc.Sbase
load_bus_idx = nc.load_data.bus_idx
load_idx = nc.load_data.original_idx
for i in load_idx:
if nc.load_data.A_floatingstar[i] > 0:
results.load_Vn[i] = nc.load_data.A_floatingstar[i] * results.voltage_A[load_bus_idx[i]] + \
nc.load_data.B_floatingstar[i] * results.voltage_B[load_bus_idx[i]] + \
nc.load_data.C_floatingstar[i] * results.voltage_C[load_bus_idx[i]]
elif nc.load_data.I3_floatingstar[i+1] > 0.0+0.0j:
Vn_prev = (results.voltage_A[load_bus_idx[i]] + results.voltage_B[load_bus_idx[i]] + results.voltage_C[
load_bus_idx[i]]) / 3
Ia, Ib, Ic, results.load_Vn[i] = floating_star_currents(results.voltage_A[load_bus_idx[i]],
results.voltage_B[load_bus_idx[i]],
results.voltage_C[load_bus_idx[i]],
nc.load_data.I3_floatingstar[4 * i + 1],
nc.load_data.I3_floatingstar[4 * i + 2],
nc.load_data.I3_floatingstar[4 * i + 3],
Vn_prev)
elif nc.load_data.S3_floatingstar[i+1] > 0.0+0.0j:
Ia, Ib, Ic, results.load_Vn[i] = floating_star_powers(results.voltage_A[load_bus_idx[i]],
results.voltage_B[load_bus_idx[i]],
results.voltage_C[load_bus_idx[i]],
nc.load_data.S3_floatingstar[4 * i + 1],
nc.load_data.S3_floatingstar[4 * i + 2],
nc.load_data.S3_floatingstar[4 * i + 3])
else:
results.load_Vn[i] = results.voltage_N[load_bus_idx[i]]
shunt_bus_idx = nc.shunt_data.bus_idx
shunt_idx = nc.shunt_data.original_idx
for i in shunt_idx:
if nc.shunt_data.A_floating_star[i] != 0.0 + 0.0j:
results.shunt_Vn[i] = nc.shunt_data.A_floating_star[i] * results.voltage_A[shunt_bus_idx[i]] + \
nc.shunt_data.B_floating_star[i] * results.voltage_B[shunt_bus_idx[i]] + \
nc.shunt_data.C_floating_star[i] * results.voltage_C[shunt_bus_idx[i]]
else:
results.shunt_Vn[i] = results.voltage_N[shunt_bus_idx[i]]
return results
[docs]
def multi_island_pf_nc_3ph(nc: NumericalCircuit,
options: PowerFlowOptions,
logger: Logger | None = None,
V_guess: Union[CxVec, None] = None,
Sbus_input: Union[CxVec, None] = None) -> PowerFlowResults3Ph:
"""
Multiple islands power flow (this is the most generic power flow function)
:param nc: SnapshotData instance
:param options: PowerFlowOptions instance
:param logger: logger
:param V_guess: voltage guess
:param Sbus_input: Use this power injections if provided (in p.u.)
:return: PowerFlowResults instance
"""
if logger is None:
logger = Logger()
results = __multi_island_pf_nc_limited_support_3ph(
nc=nc,
options=options,
logger=logger,
V_guess=V_guess,
Sbus_input=Sbus_input,
)
# expand voltages if there was a bus topology reduction
# if nc.topology_performed:
# results.voltage = nc.propagate_bus_result(results.voltage)
# do the reactive power partition and store the values
# __split_reactive_power_into_devices(nc=nc, Qbus=results.Sbus.imag, results=results)
results.three_phase = True
return results
[docs]
def multi_island_pf_3ph(multi_circuit: MultiCircuit,
options: PowerFlowOptions,
opf_results: VALID_OPF_RESULTS | None = None,
t: Union[int, None] = None,
logger: Logger = Logger(),
bus_dict: Union[Dict[Bus, int], None] = None,
areas_dict: Union[Dict[Area, int], None] = None) -> PowerFlowResults3Ph:
"""
Multiple islands power flow (this is the most generic power flow function)
:param multi_circuit: MultiCircuit instance
:param options: PowerFlowOptions instance
:param opf_results: OPF results, to be used if not None
:param t: time step, if None, the snapshot is compiled
:param logger: list of events to add to
:param bus_dict: Dus object to index dictionary
:param areas_dict: Area to area index dictionary
:return: PowerFlowResults instance
"""
nc = compile_numerical_circuit_at(
circuit=multi_circuit,
t_idx=t,
apply_temperature=options.apply_temperature_correction,
branch_tolerance_mode=options.branch_impedance_tolerance_mode,
opf_results=opf_results,
use_stored_guess=options.use_stored_guess,
bus_dict=bus_dict,
areas_dict=areas_dict,
control_taps_modules=options.control_taps_modules,
control_taps_phase=options.control_taps_phase,
control_remote_voltage=options.control_remote_voltage,
logger=logger,
fill_three_phase=True
)
res = multi_island_pf_nc_3ph(nc=nc, options=options, logger=logger)
return res