Source code for VeraGridEngine.Simulations.PowerFlow.Formulations.pf_basic_formulation
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from typing import Tuple, List
import numpy as np
from VeraGridEngine.DataStructures.numerical_circuit import NumericalCircuit
from VeraGridEngine.Topology.admittance_matrices import AdmittanceMatrices
from VeraGridEngine.Simulations.PowerFlow.power_flow_results import NumericPowerFlowResults
from VeraGridEngine.Simulations.PowerFlow.power_flow_options import PowerFlowOptions
from VeraGridEngine.Simulations.Derivatives.ac_jacobian import create_J_vc_csc
from VeraGridEngine.Simulations.PowerFlow.NumericalMethods.common_functions import (compute_fx_error,
power_flow_post_process_nonlinear)
from VeraGridEngine.Simulations.PowerFlow.NumericalMethods.discrete_controls import (control_q_inside_method,
DiscreteShuntControlState,
QvDroopControlState,
compute_slack_distribution)
from VeraGridEngine.Simulations.PowerFlow.Formulations.pf_formulation_template import PfFormulationTemplate
from VeraGridEngine.Simulations.PowerFlow.NumericalMethods.common_functions import (compute_zip_power, compute_power,
compute_fx, polar_to_rect)
from VeraGridEngine.Topology.simulation_indices import compile_types
from VeraGridEngine.basic_structures import Vec, IntVec, CxVec
from VeraGridEngine.Utils.Sparse.csc2 import CSC
[docs]
class PfBasicFormulation(PfFormulationTemplate):
def __init__(self,
V0: CxVec,
S0: CxVec,
I0: CxVec,
Y0: CxVec,
Qmin: Vec,
Qmax: Vec,
nc: NumericalCircuit,
options: PowerFlowOptions):
"""
:param V0:
:param S0:
:param I0:
:param Y0:
:param Qmin:
:param Qmax:
:param options:
"""
PfFormulationTemplate.__init__(self, V0=V0, options=options)
self.nc = nc
# Compile the admittances
self.adm: AdmittanceMatrices = nc.get_admittance_matrices()
if options.verbose > 1:
print(f"Ybus: \n {self.adm.Ybus.toarray()}")
self.S0: CxVec = S0
self.I0: CxVec = I0
self.Y0: CxVec = Y0
self.Qmin = Qmin
self.Qmax = Qmax
# Store the control states in lightweight wrappers that forward the
# numerical work to the existing Numba kernels.
self.discrete_shunt_control = DiscreteShuntControlState(nc=self.nc)
self.qv_droop_control = QvDroopControlState(S0=self.S0, nc=self.nc)
# Compile the classical bus types
self.vd, self.pq, self.pv, self.pqv, self.p, self.no_slack = compile_types(
Pbus=S0.real,
types=self.nc.bus_data.bus_types
)
# Arrays for index slicing
self.idx_dVa = np.r_[self.pv, self.pq, self.pqv, self.p]
self.idx_dVm = np.r_[self.pq, self.p]
self.idx_dP = self.idx_dVa
self.idx_dQ = np.r_[self.pq, self.pqv]
[docs]
def x2var(self, x: Vec):
"""
Convert X to decision variables
:param x: solution vector
"""
a = len(self.idx_dVa)
b = a + len(self.idx_dVm)
# update the vectors
self.Va[self.idx_dVa] = x[0:a]
self.Vm[self.idx_dVm] = x[a:b]
[docs]
def var2x(self) -> Vec:
"""
Convert the internal decision variables into the vector
:return: Vector
"""
return np.r_[
self.Va[self.idx_dVa],
self.Vm[self.idx_dVm]
]
[docs]
def update_bus_types(self, pq: IntVec, pv: IntVec, pqv: IntVec, p: IntVec):
"""
:param pq:
:param pv:
:param pqv:
:param p:
:return:
"""
self.pq = pq
self.pv = pv
self.pqv = pqv
self.p = p
self.idx_dVa = np.r_[self.pv, self.pq, self.pqv, self.p]
self.idx_dVm = np.r_[self.pq, self.p]
self.idx_dP = self.idx_dVa
self.idx_dQ = np.r_[self.pq, self.pqv]
[docs]
def size(self) -> int:
"""
Size of the jacobian matrix
:return:
"""
return len(self.idx_dVa) + len(self.idx_dVm)
[docs]
def check_error(self, x: Vec) -> Tuple[float, Vec]:
"""
Check error of the solution without affecting the problem
:param x: Solution vector
:return: error
"""
a = len(self.idx_dVa)
b = a + len(self.idx_dVm)
# update the vectors
Va = self.Va.copy()
Vm = self.Vm.copy()
Va[self.idx_dVa] = x[0:a]
Vm[self.idx_dVm] = x[a:b]
# compute the complex voltage
V = polar_to_rect(Vm, Va)
# compute the function residual
# Assumes the internal vars were updated already with self.x2var()
Sbus = compute_zip_power(self.S0, self.I0, self.Y0, Vm)
Scalc = compute_power(self.adm.Ybus, V)
dS = Scalc - Sbus # compute the mismatch
_f = np.r_[
dS[self.idx_dP].real,
dS[self.idx_dQ].imag
]
# compute the error
return compute_fx_error(_f), x
[docs]
def update(self, x: Vec, update_controls: bool = False) -> Tuple[float, bool, Vec, Vec]:
"""
Update step
:param x: Solution vector
:param update_controls:
:return: error, converged?, x
"""
# set the problem state
self.x2var(x)
# compute the complex voltage
self.V = polar_to_rect(self.Vm, self.Va)
# compute the function residual
# Assumes the internal vars were updated already with self.x2var()
Sbus = compute_zip_power(self.S0, self.I0, self.Y0, self.Vm)
self.Scalc = compute_power(self.adm.Ybus, self.V)
dS = self.Scalc - Sbus # compute the mismatch
self._f = np.r_[
dS[self.idx_dP].real,
dS[self.idx_dQ].imag
]
# self._f = compute_fx(self.Scalc, Sbus, self.idx_dP, self.idx_dQ)
# compute the error
self._error = compute_fx_error(self._f)
# review reactive power limits ---------------------------------------------------------------------------------
# it is only worth checking Q limits with a low error
# since with higher errors, the Q values may be far from realistic
# finally, the Q control only makes sense if there are pv nodes
if update_controls and self._error < self._controls_tol:
any_change = False
# update Q limits control-----------------------------------------------------------------------------------
if self.options.control_Q and (len(self.pv) + len(self.p)) > 0:
# check and adjust the reactive power
# this function passes pv buses to pq when the limits are violated,
# but not pq to pv because that is unstable
changed, pv, pq, pqv, p = control_q_inside_method(self.Scalc, self.S0,
self.pv, self.pq,
self.pqv, self.p,
self.Qmin,
self.Qmax)
if len(changed) > 0:
any_change = True
# update the bus type lists
self.update_bus_types(pq=pq, pv=pv, pqv=pqv, p=p)
# the composition of x may have changed, so recompute
x = self.var2x()
# discrete shunt logic -------------------------------------------------------------------------------------
if self.discrete_shunt_control.apply(Vm=self.Vm, adm=self.adm):
any_change = True
# update QV droop generators -------------------------------------------------------------------------------
if self.qv_droop_control.apply(S0=self.S0, Vm=self.Vm):
any_change = True
# update Slack control -------------------------------------------------------------------------------------
if self.options.distributed_slack:
ok, delta = compute_slack_distribution(
Scalc=self.Scalc,
vd=self.vd,
bus_installed_power=self.nc.bus_data.installed_power
)
if ok:
any_change = True
# Update the objective power to reflect the slack distribution
self.S0 += delta
if any_change:
# recompute the error based on the new Scalc and S0
self._f = self.fx()
# compute the error
self._error = compute_fx_error(self._f)
# converged? ---------------------------------------------------------------------------------------------------
self._converged = self._error < self.options.tolerance
return self._error, self._converged, x, self.f
[docs]
def fx(self) -> Vec:
"""
:return:
"""
# Assumes the internal vars were updated already with self.x2var()
Sbus = compute_zip_power(self.S0, self.I0, self.Y0, self.Vm)
self.Scalc = compute_power(self.adm.Ybus, self.V)
self._f = compute_fx(self.Scalc, Sbus, self.idx_dP, self.idx_dQ)
return self._f
[docs]
def Jacobian(self) -> CSC:
"""
:return:
"""
# Assumes the internal vars were updated already with self.x2var()
if self.adm.Ybus.format != 'csc':
self.adm.Ybus = self.adm.Ybus.tocsc()
nbus = self.adm.Ybus.shape[0]
if self.options.verbose >= 2:
print("Ybus:")
print(self.adm.Ybus.toarray())
# Create J in CSC order
J = create_J_vc_csc(nbus, self.adm.Ybus.data, self.adm.Ybus.indptr, self.adm.Ybus.indices,
self.V, self.idx_dVa, self.idx_dVm, self.idx_dP, self.idx_dQ)
return J
[docs]
def get_x_names(self) -> List[str]:
"""
Names matching x
:return:
"""
cols = [f'dVa {i}' for i in self.idx_dVa]
cols += [f'dVm {i}' for i in self.idx_dVm]
return cols
[docs]
def get_fx_names(self) -> List[str]:
"""
Names matching fx
:return:
"""
rows = [f'dP {i}' for i in self.idx_dP]
rows += [f'dQ {i}' for i in self.idx_dQ]
return rows
[docs]
def get_solution(self, elapsed: float, iterations: int) -> NumericPowerFlowResults:
"""
Get the problem solution
:param elapsed: Elapsed seconds
:param iterations: Iteration number
:return: NumericPowerFlowResults
"""
# Compute the Branches power and the slack buses power
Sf, St, If, It, Vbranch, loading, losses, Sbus = power_flow_post_process_nonlinear(
Sbus=self.Scalc,
V=self.V,
F=self.nc.passive_branch_data.F,
T=self.nc.passive_branch_data.T,
pv=self.pv,
vd=self.vd,
Ybus=self.adm.Ybus,
Yf=self.adm.Yf,
Yt=self.adm.Yt,
Yshunt_bus=self.adm.Yshunt_bus,
branch_rates=self.nc.passive_branch_data.rates,
Sbase=self.nc.Sbase
)
return NumericPowerFlowResults(V=self.V,
Scalc=Sbus * self.nc.Sbase,
m=self.nc.active_branch_data.tap_module,
tau=self.nc.active_branch_data.tap_angle,
Sf=Sf,
St=St,
If=If,
It=It,
loading=loading,
losses=losses,
Pfp_vsc=np.zeros(self.nc.nvsc, dtype=float),
Pfn_vsc=np.zeros(self.nc.nvsc, dtype=float),
St_vsc=np.zeros(self.nc.nvsc, dtype=complex),
If_vsc=np.zeros(self.nc.nvsc, dtype=float),
It_vsc=np.zeros(self.nc.nvsc, dtype=complex),
losses_vsc=np.zeros(self.nc.nvsc, dtype=float),
loading_vsc=np.zeros(self.nc.nvsc, dtype=float),
Sf_hvdc=np.zeros(self.nc.nhvdc, dtype=complex),
St_hvdc=np.zeros(self.nc.nhvdc, dtype=complex),
losses_hvdc=np.zeros(self.nc.nhvdc, dtype=complex),
loading_hvdc=np.zeros(self.nc.nhvdc, dtype=complex),
norm_f=self.error,
converged=self.converged,
iterations=iterations,
elapsed=elapsed)