# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import time
from typing import Tuple
import pandas as pd
from scipy import sparse
from scipy.sparse import hstack as sphs, vstack as spvs, csc_matrix, diags
from scipy.sparse.linalg import factorized, spsolve, spilu, splu, eigsh
import numpy as np
from VeraGridEngine.Simulations.StateEstimation.state_estimation_inputs import StateEstimationInput
from VeraGridEngine.Simulations.PowerFlow.NumericalMethods.common_functions import power_flow_post_process_nonlinear
from VeraGridEngine.DataStructures.numerical_circuit import NumericalCircuit
from VeraGridEngine.Simulations.Derivatives.matpower_derivatives import (dSbus_dV_matpower, dSbr_dV_matpower,
dIbr_dV_matpower)
from VeraGridEngine.Simulations.StateEstimation.state_estimation_results import NumericStateEstimationResults
from VeraGridEngine.basic_structures import CscMat, IntVec, CxVec, Vec, ObjVec, Logger
[docs]
def Jacobian_SE(Ybus: csc_matrix, Yf: csc_matrix, Yt: csc_matrix, V: CxVec,
f: IntVec, t: IntVec, Cf: csc_matrix, Ct: csc_matrix,
inputs: StateEstimationInput, pvpq: IntVec,
load_per_bus: CxVec,
fixed_slack: bool):
"""
Get the arrays for calculation
:param Ybus: Admittance matrix
:param Yf: "from" admittance matrix
:param Yt: "to" admittance matrix
:param V: Voltages complex vector
:param f: array of "from" indices of branches
:param t: array of "to" indices of branches
:param Cf: Connectivity matrix "from"
:param Ct: Connectivity matrix "to"
:param inputs: instance of StateEstimationInput
:param pvpq: array of pq|pv bus indices
:param load_per_bus: Array of load per bus in p.u. (used to compute the Pg and Qg measurements)
:param fixed_slack: if true, the measurements on the slack bus are omitted
:return: H (jacobian), h (residual), S (power injections)
"""
n = Ybus.shape[0]
# compute currents
I = Ybus @ V
If = Yf @ V
It = Yt @ V
# compute powers
S = V * np.conj(I)
Sf = V[f] * np.conj(If)
St = V[t] * np.conj(It)
dS_dVa, dS_dVm = dSbus_dV_matpower(Ybus, V)
dSf_dVa, dSf_dVm, dSt_dVa, dSt_dVm = dSbr_dV_matpower(Yf, Yt, V, f, t, Cf, Ct)
dIf_dVa, dIf_dVm, dIt_dVa, dIt_dVm = dIbr_dV_matpower(Yf, Yt, V)
# compute the derivatives of absolute current:
# using the identity d|I|2/dx = 2 * re(diags(conj(I)) x dI/dx)
# so we will square the current measurements for using these derivatives
dabsIf2_dVa = 2.0 * (diags(np.conj(If)) @ dIf_dVa).real
dabsIf2_dVm = 2.0 * (diags(np.conj(If)) @ dIf_dVm).real
dabsIt2_dVa = 2.0 * (diags(np.conj(It)) @ dIt_dVa).real
dabsIt2_dVm = 2.0 * (diags(np.conj(It)) @ dIt_dVm).real
# slice derivatives
dP_dVa = dS_dVa[np.ix_(inputs.p_idx, pvpq)].real
dQ_dVa = dS_dVa[np.ix_(inputs.q_idx, pvpq)].imag
dPg_dVa = dS_dVa[np.ix_(inputs.pg_idx, pvpq)].real
dQg_dVa = dS_dVa[np.ix_(inputs.qg_idx, pvpq)].imag
dPf_dVa = dSf_dVa[np.ix_(inputs.pf_idx, pvpq)].real
dPt_dVa = dSt_dVa[np.ix_(inputs.pt_idx, pvpq)].real
dQf_dVa = dSf_dVa[np.ix_(inputs.qf_idx, pvpq)].imag
dQt_dVa = dSt_dVa[np.ix_(inputs.qt_idx, pvpq)].imag
dIf_dVa = np.abs(dabsIf2_dVa[np.ix_(inputs.if_idx, pvpq)])
dIt_dVa = np.abs(dabsIt2_dVa[np.ix_(inputs.it_idx, pvpq)])
dVm_dVa = csc_matrix(np.zeros((len(inputs.vm_idx), len(pvpq))))
dVa_dVa = csc_matrix(np.diag(np.ones(n))[np.ix_(inputs.va_idx, pvpq)])
if fixed_slack:
# With the fixed slack, we don't need to compute the derivative values for the slack Vm
dP_dVm = dS_dVm[np.ix_(inputs.p_idx, pvpq)].real
dQ_dVm = dS_dVm[np.ix_(inputs.q_idx, pvpq)].imag
dPg_dVm = dS_dVm[np.ix_(inputs.pg_idx, pvpq)].real
dQg_dVm = dS_dVm[np.ix_(inputs.qg_idx, pvpq)].imag
dPf_dVm = dSf_dVm[np.ix_(inputs.pf_idx, pvpq)].real
dPt_dVm = dSt_dVm[np.ix_(inputs.pt_idx, pvpq)].real
dQf_dVm = dSf_dVm[np.ix_(inputs.qf_idx, pvpq)].imag
dQt_dVm = dSt_dVm[np.ix_(inputs.qt_idx, pvpq)].imag
dIf_dVm = np.abs(dabsIf2_dVm[np.ix_(inputs.if_idx, pvpq)])
dIt_dVm = np.abs(dabsIt2_dVm[np.ix_(inputs.it_idx, pvpq)])
dVm_dVm = csc_matrix(np.diag(np.ones(n))[np.ix_(inputs.vm_idx, pvpq)])
dVa_dVm = csc_matrix(np.zeros((len(inputs.va_idx), len(pvpq))))
else:
# With the non fixed slack, we need to compute the derivative values for the slack Vm
dP_dVm = dS_dVm[inputs.p_idx, :].real
dQ_dVm = dS_dVm[inputs.q_idx, :].imag
dPg_dVm = dS_dVm[inputs.pg_idx, :].real
dQg_dVm = dS_dVm[inputs.qg_idx, :].imag
dPf_dVm = dSf_dVm[inputs.pf_idx, :].real
dPt_dVm = dSt_dVm[inputs.pt_idx, :].real
dQf_dVm = dSf_dVm[inputs.qf_idx, :].imag
dQt_dVm = dSt_dVm[inputs.qt_idx, :].imag
dIf_dVm = np.abs(dabsIf2_dVm[inputs.if_idx, :])
dIt_dVm = np.abs(dabsIt2_dVm[inputs.it_idx, :])
dVm_dVm = csc_matrix(np.diag(np.ones(n))[inputs.vm_idx, :])
dVa_dVm = csc_matrix(np.zeros((len(inputs.va_idx), n)))
# pack the Jacobian
H = spvs([
sphs([dP_dVa, dP_dVm]),
sphs([dQ_dVa, dQ_dVm]),
sphs([dPg_dVa, dPg_dVm]),
sphs([dQg_dVa, dQg_dVm]),
sphs([dPf_dVa, dPf_dVm]),
sphs([dPt_dVa, dPt_dVm]),
sphs([dQf_dVa, dQf_dVm]),
sphs([dQt_dVa, dQt_dVm]),
sphs([dIf_dVa, dIf_dVm]),
sphs([dIt_dVa, dIt_dVm]),
sphs([dVm_dVa, dVm_dVm]),
sphs([dVa_dVa, dVa_dVm])
])
# pack the mismatch vector (calculated estimates in per-unit)
h = np.r_[
S[inputs.p_idx].real, # P
S[inputs.q_idx].imag, # Q
S[inputs.pg_idx].real - load_per_bus[inputs.pg_idx].real, # Pg
S[inputs.qg_idx].imag - load_per_bus[inputs.qg_idx].imag, # Qg
Sf[inputs.pf_idx].real, # Pf
St[inputs.pt_idx].real, # Pt
Sf[inputs.qf_idx].imag, # Qf
St[inputs.qt_idx].imag, # Qt
np.power(np.abs(If[inputs.if_idx]), 2), # If
np.power(np.abs(It[inputs.it_idx]), 2), # It
np.abs(V[inputs.vm_idx]), # Vm
np.angle(V[inputs.va_idx]), # Va
]
return H, h, S
[docs]
def get_measurements_and_deviations(t: int|None,
se_input: StateEstimationInput,
Sbase: float,
use_current_squared_meas: bool = True) -> Tuple[Vec, Vec, ObjVec]:
"""
get_measurements_and_deviations the measurements into "measurements" and "sigma"
ordering: Pinj, Pflow, Qinj, Qflow, Iflow, Vm
:param t: time step
:param se_input: StateEstimationInput object
:param Sbase: base power in MVA (i.e. 100 MVA)
:param use_current_squared_meas: use squared current measurements?
:return: measurements vector in per-unit, sigma vector in per-unit
"""
nz = se_input.size()
measurements = np.zeros(nz, dtype=object)
magnitudes = np.zeros(nz)
sigma = np.zeros(nz)
# go through the measurements in order and form the vectors
k = 0
for lst in [se_input.p_inj,
se_input.q_inj,
se_input.pg_inj,
se_input.qg_inj,
se_input.pf_value,
se_input.pt_value,
se_input.qf_value,
se_input.qt_value]:
for m in lst:
magnitudes[k] = m.get_value_pu_at(t, Sbase)
sigma[k] = m.get_standard_deviation_pu_at(t, Sbase)
measurements[k] = m
k += 1
if not use_current_squared_meas:
for lst in [se_input.if_value, se_input.it_value]:
for m in lst:
I_pu = m.get_value_pu_at(t, Sbase)
sig_I = m.get_standard_deviation_pu_at(t, Sbase)
# Use current magnitude directly (more stable)
y = max(abs(I_pu), 1e-4) # Avoid zero
sig_y = sig_I
magnitudes[k] = y
sigma[k] = sig_y
measurements[k] = m
k += 1
else:
# current measurements need to be squared
for lst in [se_input.if_value,
se_input.it_value]:
for m in lst:
magnitudes[k] = np.power(m.get_value_pu_at(t, Sbase), 2)
sigma[k] = m.get_standard_deviation_pu_at(t, Sbase)
measurements[k] = m
k += 1
for lst in [se_input.vm_value,
se_input.va_value]:
for m in lst:
magnitudes[k] = m.value
sigma[k] = m.sigma
measurements[k] = m
k += 1
return magnitudes, sigma, measurements
[docs]
def b_test(sigma2: Vec,
H: csc_matrix,
dz: np.ndarray,
HtWH: csc_matrix,
c_threshold: float = 4.0,
logger: Logger | None = None):
"""
From RELIABLE BAD DATA PROCESSING FOR REAL-TIME STATE ESTIMATION, 1983
Monticelli & Garcia (1983) 'b-test' bad data detection
:param sigma2: sigma 2
:param H: Jacobian at the solution (m x k)
:param dz: residuals r = z - h(x^) (length m)
:param HtWH: G = H^T W H (k x k)
:param c_threshold: detection threshold 'c' (use 4.0 as in the paper)
:param logger: Logger
:return:
'r' : residuals r_i
'sigma2' : sigma_i^2
'Pii' : residual variances P_ii
'rN' : normalized residuals r_i / sqrt(P_ii)
'imax' : index of largest |rN|
'b' : b at imax
'is_bad' : bool
"""
m, k = H.shape
# Factorize G once, then solve G y = H_i^T for each i
# Use LU so we can reuse for many RHS
lu = factorized(HtWH.tocsc())
# For the system to be observable the eigenvalues should be greater than zero -> matrix pos definate
eigvals, _ = eigsh(HtWH)
assert np.all(eigvals > 0), "Unobservable-System"
# Compute h_i = H_i G^{-1} H_i^T and then Pii = sigma_i^2 - h_i
# Do it row-by-row but reusing the factorization
Pii = np.empty(m, dtype=float)
# iterate efficiently over rows of H
H_csr = H.tocsr()
for i in range(m):
# get sparse row i as (data, indices)
row = H_csr.getrow(i)
if row.nnz == 0:
# no sensitivity: Pii = sigma^2 (critical measurement with no redundancy)
Pii[i] = sigma2[i]
else:
# Solve y = G^{-1} H_i^T
y = lu(row.T.toarray()) # shape (k,1)
# h_i = H_i y
h_i = float(row.dot(y).ravel()[0])
Pii[i] = sigma2[i] - h_i
# numerical guard
if Pii[i] <= 0:
# If numerical issues produce tiny negative values, clamp to a small positive eps
Pii[i] = max(Pii[i], 1e-14)
elif Pii[i] < 1e-10: # Too small - likely numerical error
Pii[i] = max(1e-10, sigma2[i] * 0.01) # Conservative estimate
elif Pii[i] > sigma2[i]: # Impossible physically
logger.add_warning(f"Pii[{i}] > sigma2[{i}] ({Pii[i]:.2e} > {sigma2[i]:.2e})")
Pii[i] = sigma2[i] * 0.5 # Reasonable default
r = dz
rN = r / np.sqrt(Pii)
imax = np.argmax(np.abs(rN)).astype(int)
# b_i = (sigma_i / Pii) * r_i with sigma_i = sqrt(sigma2_i)
b = (np.sqrt(sigma2[imax]) / Pii[imax]) * r[imax]
# compute where the measurements are bad
is_bad = np.abs(b) > c_threshold
return r, sigma2, Pii, rN, imax, b, is_bad
[docs]
def solve_se_lm(nc: NumericalCircuit,
Ybus: CscMat,
Yf: CscMat,
Yt: CscMat,
Yshunt_bus: CxVec,
F: IntVec,
T: IntVec,
Cf: csc_matrix,
Ct: csc_matrix,
se_input: StateEstimationInput,
vd: IntVec,
pv: IntVec,
no_slack: IntVec,
tol=1e-9,
max_iter=100,
verbose: int = 0,
c_threshold: float = 4.0,
prefer_correct: bool = False,
fixed_slack: bool = False,
logger: Logger | None = None) -> NumericStateEstimationResults:
"""
Solve the state estimation problem using the Levenberg-Marquadt method
:param nc: instance of NumericalCircuit
:param Ybus: Admittance matrix
:param Yf: Admittance matrix of the from Branches
:param Yt: Admittance matrix of the to Branches
:param Yshunt_bus: Array of shunt admittances
:param F: array with the from bus indices of all the Branches
:param T: array with the to bus indices of all the Branches
:param Cf:
:param Ct:
:param se_input: state estimation input instance (contains the measurements)
:param vd: array of slack node indices
:param pv: array of PV node indices
:param no_slack: array of non-slack node indices
:param tol: Tolerance
:param max_iter: Maximum nuber of iterations
:param verbose: Verbosity level
:param c_threshold: Bad data detection threshold 'c' (4 as default)
:param prefer_correct: if true the measurements are corrected instead of deleted
:param fixed_slack: if true, the measurements on the slack bus are omitted
:param logger: log it out
:return: NumericPowerFlowResults instance
"""
start_time = time.time()
# confidence_value = 0.95
bad_data_detected = False
logger = logger if logger is not None else Logger()
n_no_slack = len(no_slack)
n = Ybus.shape[0]
V = nc.bus_data.Vbus.copy()
Va = np.angle(V)
Vm = np.abs(V)
load_per_bus = nc.load_data.get_injections_per_bus() / nc.Sbase
# pick the measurements and uncertainties (initially in physical units: MW, MVAr, A, pu V)
z, sigma, measurements = get_measurements_and_deviations(t=None,
se_input=se_input,
Sbase=nc.Sbase)
# Levenberg-Marquardt method
iter_ = 0
converged = False
norm_f = 1e20
nu = 2.0
error_list = list()
# first computation of the jacobian and free term
H, h, Scalc = Jacobian_SE(Ybus, Yf, Yt, V, F, T, Cf, Ct, se_input, no_slack, load_per_bus, fixed_slack)
# compute the weights matrix using per-unit sigma
sigma2 = np.power(sigma, 2.0)
W_vec = 1.0 / sigma2
W = diags(W_vec)
# measurements error (in per-unit)
dz = z - h
# compose the jacobian of the problem
HtW = H.T @ W
Gx = HtW @ H
# compose the residual
gx = HtW @ dz
# measurements error (in per-unit)
# dz = z - h
# first value of mu
mu = 1e-3 * float(Gx.diagonal().max())
# system matrix for levenberg-marquardt
Idn = diags(np.full(Gx.shape[0], mu))
Asys = Gx.T @ Gx + Idn
rhs = Gx.T @ gx
# set the previous objective function value
obj_val_prev = 1e20
# objective function
obj_val = 0.5 * dz @ (W * dz)
while not converged and iter_ < max_iter:
# Solve the increment
dx = spsolve(Asys, rhs)
# if norm_f < (tol * 10.0):
# try:
# r, sigma2, Pii, rN, imax, b, bad_data_detected = b_test(sigma2=sigma2, H=H, dz=dz, HtWH=Gx,
# c_threshold=c_threshold, logger=logger)
# except AssertionError as e:
# if str(e) == "Unobservable-System":
# return NumericStateEstimationResults(V=V,
# Scalc=Scalc,
# norm_f=norm_f,
# converged=False,
# iterations=iter_,
# elapsed=time.time() - start_time,
# bad_data_detected=False,
# is_observable=False)
#
#
# if bad_data_detected:
# if prefer_correct:
# if Pii[imax] > 1e-10: # if the value is not corrected in b_test alone
# z_tilde_imax = z[imax] - (sigma[imax] ** 2 / Pii[imax]) * r[imax]
#
# logger.add_info("Measurement corrected",
# device=measurements[imax].device.name,
# device_class=measurements[imax].device_type.value,
# device_property="value",
# value=z[imax],
# expected_value=z_tilde_imax)
#
# # correct the bad data index
# z[imax] = z_tilde_imax
# else:
# # Pii is very small - this is likely a critical measurement
# # TODO -> Do not delete
# logger.add_warning(f"Measurement {imax} appears critical (Pii={Pii[imax]:.2e})")
# # Don't correct critical measurements, just remove them
# # delete measurements
# mask = np.ones(len(z), dtype=int)
# mask[imax] = 0
#
# se_input = se_input.slice_with_mask(mask=mask)
#
# # pick the measurements and uncertainties (initially in physical units: MW, MVAr, A, pu V)
# z, sigma, measurements = get_measurements_and_deviations(se_input=se_input, Sbase=nc.Sbase)
#
# # compute the weights matrix using per-unit sigma
# sigma2 = np.power(sigma, 2.0)
# cov = 1.0 / sigma2
# W = diags(cov).tocsc()
# else:
#
# logger.add_info("Measurement deleted",
# device=measurements[imax].device.name,
# device_class=measurements[imax].device_type.value,
# device_property="value",
# value=z[imax],
# expected_value="")
#
# # delete measurements
# mask = np.ones(len(z), dtype=int)
# mask[imax] = 0
#
# se_input = se_input.slice_with_mask(mask=mask)
#
# # pick the measurements and uncertainties (initially in physical units: MW, MVAr, A, pu V)
# z, sigma, measurements = get_measurements_and_deviations(se_input=se_input, Sbase=nc.Sbase)
#
# # compute the weights matrix using per-unit sigma
# sigma2 = np.power(sigma, 2.0)
# cov = 1.0 / sigma2
# W = diags(cov).tocsc()
# L-M ratios of convergence
dF = obj_val_prev - obj_val
dL = 0.5 * dx @ (mu * dx + gx)
if (dF != 0.0) and (dL > 0.0):
rho = dF / dL
mu *= max([1.0 / 3.0, 1.0 - np.power(2 * rho - 1, 3.0)])
nu = 2.0
# modify the solution
if fixed_slack:
dVa = dx[:n_no_slack]
dVm = dx[n_no_slack:]
Va[no_slack] += dVa
Vm[no_slack] += dVm # yes, this is for all the buses
else:
dVa = dx[:n_no_slack]
dVm = dx[n_no_slack:]
Va[no_slack] += dVa
Vm += dVm # yes, this is for all the buses
V = Vm * np.exp(1j * Va)
# if verbose > 1:
# dva = np.zeros(n)
# dva[: n_no_slack] = dVa
# df = pd.DataFrame(data={"dVa": dva, "dVm": dVm, "Va": Va, "Vm": Vm})
# print(df)
# update system
H, h, Scalc = Jacobian_SE(Ybus, Yf, Yt, V, F, T, Cf, Ct, se_input, no_slack, load_per_bus, fixed_slack)
# record the previous objective function value
obj_val_prev = obj_val
# measurements error (in per-unit)
dz = z - h
# compose the jacobian of the problem
HtW = H.T @ W
Gx = HtW @ H
# compose the residual
gx = HtW @ dz
#
# # measurements error (in per-unit)
# dz = z - h
# system matrix for levenberg-marquardt
Idn = diags(np.full(Gx.shape[0], mu))
Asys = Gx.T @ Gx + Idn
rhs = Gx.T @ gx
# objective function
obj_val = 0.5 * dz @ (W * dz)
else:
mu *= nu
nu *= 2.0
# compute the convergence
norm_f = np.linalg.norm(dx, np.inf)
converged = norm_f < tol
error_list.append(norm_f)
if verbose > 0:
print(f"Norm_f {norm_f}")
# update loops
iter_ += 1
# Compute the Branches power and the slack buses power
Sf, St, If, It, Vbranch, loading, losses, Sbus = power_flow_post_process_nonlinear(
Sbus=Scalc,
V=V,
F=nc.passive_branch_data.F,
T=nc.passive_branch_data.T,
pv=pv,
vd=vd,
Ybus=Ybus,
Yf=Yf,
Yt=Yt,
Yshunt_bus=Yshunt_bus,
branch_rates=nc.passive_branch_data.rates,
Sbase=nc.Sbase)
if verbose > 1:
from matplotlib import pyplot as plt
plt.plot(error_list)
plt.yscale('log')
# plt.show()
return NumericStateEstimationResults(V=V,
Scalc=Scalc,
m=nc.active_branch_data.tap_module,
tau=nc.active_branch_data.tap_angle,
Sf=Sf,
St=St,
If=If,
It=It,
loading=loading,
losses=losses,
Pf_vsc=np.zeros(nc.nvsc, dtype=float),
St_vsc=np.zeros(nc.nvsc, dtype=complex),
If_vsc=np.zeros(nc.nvsc, dtype=float),
It_vsc=np.zeros(nc.nvsc, dtype=complex),
losses_vsc=np.zeros(nc.nvsc, dtype=float),
loading_vsc=np.zeros(nc.nvsc, dtype=float),
Sf_hvdc=np.zeros(nc.nhvdc, dtype=complex),
St_hvdc=np.zeros(nc.nhvdc, dtype=complex),
losses_hvdc=np.zeros(nc.nhvdc, dtype=complex),
loading_hvdc=np.zeros(nc.nhvdc, dtype=complex),
norm_f=norm_f,
converged=converged,
iterations=iter_,
is_observable=bool(converged),
elapsed=time.time() - start_time,
bad_data_detected=bad_data_detected)
[docs]
def solve_se_nr(nc: NumericalCircuit,
Ybus: CscMat,
Yf: CscMat,
Yt: CscMat,
Yshunt_bus: CxVec,
F: IntVec,
T: IntVec,
Cf: csc_matrix,
Ct: csc_matrix,
se_input: StateEstimationInput,
vd: IntVec,
pv: IntVec,
no_slack: IntVec,
tol=1e-9,
max_iter=100,
verbose: int = 0,
c_threshold: float = 4.0,
prefer_correct: bool = False,
fixed_slack: bool = False,
logger: Logger | None = None) -> NumericStateEstimationResults:
"""
Solve the state estimation problem using the Levenberg-Marquadt method
:param nc: instance of NumericalCircuit
:param Ybus: Admittance matrix
:param Yf: Admittance matrix of the from Branches
:param Yt: Admittance matrix of the to Branches
:param Yshunt_bus: Array of shunt admittances
:param F: array with the from bus indices of all the Branches
:param T: array with the to bus indices of all the Branches
:param Cf:
:param Ct:
:param se_input: state estimation input instance (contains the measurements)
:param vd: array of slack node indices
:param pv: array of PV node indices
:param no_slack: array of non-slack node indices
:param tol: Tolerance
:param max_iter: Maximum nuber of iterations
:param verbose: Verbosity level
:param c_threshold: Bad data detection threshold 'c' (4 as default)
:param prefer_correct: if true the measurements are corrected instead of deleted
:param fixed_slack: if true, the measurements on the slack bus are omitted
:param logger: log it out
:return: NumericPowerFlowResults instance
"""
start_time = time.time()
# confidence_value = 0.95
bad_data_detected = False
logger = logger if logger is not None else Logger()
n_no_slack = len(no_slack)
n = Ybus.shape[0]
V = nc.bus_data.Vbus.copy()
Va = np.angle(V)
Vm = np.abs(V)
Scalc = np.zeros(nc.nbus, dtype=complex)
load_per_bus = nc.load_data.get_injections_per_bus() / nc.Sbase
# pick the measurements and uncertainties (initially in physical units: MW, MVAr, A, pu V)
z, sigma, measurements = get_measurements_and_deviations(t=None, se_input=se_input, Sbase=nc.Sbase)
# compute the weights matrix using per-unit sigma
sigma2 = np.power(sigma, 2.0)
W_vec = 1.0 / sigma2
W = diags(W_vec)
iter_ = 0
converged = False
norm_f = 1e20
error_list = list()
while not converged and iter_ < max_iter:
# update system
H, h, Scalc = Jacobian_SE(Ybus, Yf, Yt, V, F, T, Cf, Ct, se_input, no_slack, load_per_bus, fixed_slack)
# measurements error (in per-unit)
dz = z - h
# compose the jacobian of the problem
HtW = H.T @ W
Gx = HtW @ H
# compose the residual
gx = HtW @ dz
# Solve the increment
dx = spsolve(Gx, gx) # (HtW @ dz) * (new_state) = HtW @ H
# modify the solution
if fixed_slack:
dVa = dx[:n_no_slack]
dVm = dx[n_no_slack:]
Va[no_slack] += dVa
Vm[no_slack] += dVm # yes, this is for all the buses
else:
dVa = dx[:n_no_slack]
dVm = dx[n_no_slack:]
Va[no_slack] += dVa
Vm += dVm # yes, this is for all the buses
V = Vm * np.exp(1j * Va)
# This is because this formulation may produce negative Vm
# and by doing this it is corrected
Vm = np.abs(V)
Va = np.angle(V)
# if norm_f < (tol * 10.0):
# r, sigma2, Pii, rN, imax, b, bad_data_detected = b_test(sigma2=sigma2, H=H, dz=dz, HtWH=Gx,
# c_threshold=c_threshold, logger=logger)
#
# if bad_data_detected:
# if prefer_correct:
# if Pii[imax] > 1e-10: # if the value is not corrected in b_test alone
# z_tilde_imax = z[imax] - (sigma[imax] ** 2 / Pii[imax]) * r[imax]
#
# logger.add_info("Measurement corrected",
# device=measurements[imax].device.name,
# device_class=measurements[imax].device_type.value,
# device_property="value",
# value=z[imax],
# expected_value=z_tilde_imax)
#
# # correct the bad data index
# z[imax] = z_tilde_imax
# else:
# # Pii is very small - this is likely a critical measurement
# logger.add_warning(f"Measurement {imax} appears critical (Pii={Pii[imax]:.2e})")
# # Don't correct critical measurements, just remove them
# # delete measurements
# mask = np.ones(len(z), dtype=int)
# mask[imax] = 0
#
# se_input = se_input.slice_with_mask(mask=mask)
#
# # pick the measurements and uncertainties (initially in physical units: MW, MVAr, A, pu V)
# z, sigma, measurements = get_measurements_and_deviations(se_input=se_input, Sbase=nc.Sbase)
#
# # compute the weights matrix using per-unit sigma
# sigma2 = np.power(sigma, 2.0)
# cov = 1.0 / sigma2
# W = diags(cov).tocsc()
# else:
#
# logger.add_info("Measurement deleted",
# device=measurements[imax].device.name,
# device_class=measurements[imax].device_type.value,
# device_property="value",
# value=z[imax],
# expected_value="")
#
# # delete measurements
# mask = np.ones(len(z), dtype=int)
# mask[imax] = 0
#
# se_input = se_input.slice_with_mask(mask=mask)
#
# # pick the measurements and uncertainties (initially in physical units: MW, MVAr, A, pu V)
# z, sigma, measurements = get_measurements_and_deviations(se_input=se_input, Sbase=nc.Sbase)
#
# # compute the weights matrix using per-unit sigma
# sigma2 = np.power(sigma, 2.0)
# cov = 1.0 / sigma2
# W = diags(cov).tocsc()
if verbose > 1:
dva = np.zeros(n)
dva[: n_no_slack] = dVa
df = pd.DataFrame(data={"dVa": dva, "dVm": dVm, "Va": Va, "Vm": Vm})
print(df)
# compute the convergence
norm_f = np.linalg.norm(gx, np.inf)
converged = norm_f < tol
error_list.append(norm_f)
if verbose > 0:
print(f"Norm_f {norm_f}")
# update loops
iter_ += 1
# Compute the Branches power and the slack buses power
Sf, St, If, It, Vbranch, loading, losses, Sbus = power_flow_post_process_nonlinear(
Sbus=Scalc,
V=V,
F=nc.passive_branch_data.F,
T=nc.passive_branch_data.T,
pv=pv,
vd=vd,
Ybus=Ybus,
Yf=Yf,
Yt=Yt,
Yshunt_bus=Yshunt_bus,
branch_rates=nc.passive_branch_data.rates,
Sbase=nc.Sbase)
if verbose > 1:
from matplotlib import pyplot as plt
plt.plot(error_list)
plt.yscale('log')
# plt.show()
return NumericStateEstimationResults(V=V,
Scalc=Scalc,
m=nc.active_branch_data.tap_module,
tau=nc.active_branch_data.tap_angle,
Sf=Sf,
St=St,
If=If,
It=It,
loading=loading,
losses=losses,
Pf_vsc=np.zeros(nc.nvsc, dtype=float),
St_vsc=np.zeros(nc.nvsc, dtype=complex),
If_vsc=np.zeros(nc.nvsc, dtype=float),
It_vsc=np.zeros(nc.nvsc, dtype=complex),
losses_vsc=np.zeros(nc.nvsc, dtype=float),
loading_vsc=np.zeros(nc.nvsc, dtype=float),
Sf_hvdc=np.zeros(nc.nhvdc, dtype=complex),
St_hvdc=np.zeros(nc.nhvdc, dtype=complex),
losses_hvdc=np.zeros(nc.nhvdc, dtype=complex),
loading_hvdc=np.zeros(nc.nhvdc, dtype=complex),
norm_f=norm_f,
converged=bool(converged),
iterations=iter_,
elapsed=time.time() - start_time,
is_observable=bool(converged),
bad_data_detected=bad_data_detected)
[docs]
def solve_se_gauss_newton(nc: NumericalCircuit,
Ybus: CscMat,
Yf: CscMat,
Yt: CscMat,
Yshunt_bus: CxVec,
F: IntVec,
T: IntVec,
Cf: csc_matrix,
Ct: csc_matrix,
se_input: StateEstimationInput,
vd: IntVec,
pv: IntVec,
no_slack: IntVec,
tol=1e-9,
max_iter=100,
verbose: int = 0,
c_threshold: float = 4.0,
prefer_correct: bool = False,
fixed_slack: bool = False,
logger: Logger | None = None) -> NumericStateEstimationResults:
"""
Linearize the non-linear measurement model around the current state estimate (Jacobian H)
Solve the linear WLS problem: Ξx = 1/(Hα΅WH)Hα΅Wdz
Update the state: x = x + Ξx
Repeat until convergence
"""
start_time = time.time()
logger = logger if logger is not None else Logger()
# Initialization
n_no_slack = len(no_slack)
V = nc.bus_data.Vbus.copy() # initial guess (can be flat start or power flow solution)
Va = np.angle(V)
Vm = np.abs(V)
load_per_bus = nc.load_data.get_injections_per_bus() / nc.Sbase
# Get measurements
z, sigma, measurements = get_measurements_and_deviations(t=None,
se_input=se_input,
Sbase=nc.Sbase,
use_current_squared_meas=True)
# Weight matrix with regularization to avoid numerical issues
sigma2 = np.power(sigma, 2.0)
W_vec = 1.0 / np.maximum(sigma2, 1e-10) # Avoid division by zero
W = diags(W_vec)
# Simple iterative method (Gauss-Newton style)
iter_ = 0
converged = False
norm_f = 1e20
error_list = []
# Step control parameters
max_step_va = 0.3 # radians
max_step_vm = 0.2 # per unit
relaxation = 1.1 # initial step relaxation
obj_val_prev = 1e30
while not converged and iter_ < max_iter:
# Compute Jacobian and measurement function at current state
H, h, Scalc = Jacobian_SE(Ybus, Yf, Yt, V, F, T, Cf, Ct, se_input, no_slack, load_per_bus, fixed_slack)
# Measurement residuals
dz = z - h
# Objective function value
obj_val = 0.5 * dz @ (W * dz)
# Build normal equations
HtW = H.T @ W
G = HtW @ H # Gain matrix
# Add regularization to handle ill-conditioning
reg_factor = 1e-6 * G.diagonal().max()
G_reg = G + reg_factor * sparse.eye(G.shape[0])
g = HtW @ dz # Gradient
# Solve for state update
try:
dx = spsolve(G_reg, g)
except:
# If matrix is singular, use pseudo-inverse
dx = spilu(G).solve(g)
# Update state
if fixed_slack:
dVa = dx[:n_no_slack]
dVm = dx[n_no_slack:]
dVa = np.clip(dVa, -max_step_va, max_step_va)
dVm = np.clip(dVm, -max_step_vm, max_step_vm)
Va[no_slack] += dVa
Vm[no_slack] += dVm
else:
dVa = dx[:n_no_slack]
dVm = dx[n_no_slack:]
dVa = np.clip(dVa, -max_step_va, max_step_va)
dVm = np.clip(dVm, -max_step_vm, max_step_vm)
Va[no_slack] += dVa
Vm += dVm
V = Vm * np.exp(1j * Va)
# Adaptive step control based on objective function improvement
# if obj_val < obj_val_prev:
# # Good step - slightly increase relaxation
# relaxation = min(1.0, relaxation * 1.1)
# else:
# # Bad step - reduce step size and backtrack
# relaxation *= 0.8
# if fixed_slack:
# Va[no_slack] -= relaxation * dVa
# Vm[no_slack] -= relaxation * dVm
# else:
# Va[no_slack] -= relaxation * dVa
# Vm -= relaxation * dVm
# V = Vm * np.exp(1j * Va)
# continue # Recompute with smaller step
obj_val_prev = obj_val
Vm = np.abs(V)
Va = np.angle(V)
# Check convergence
norm_f = np.linalg.norm(dx, np.inf)
converged = norm_f < tol
error_list.append(norm_f)
if verbose > 0:
print(f"Iter {iter_}: norm_f = {norm_f:.6e}, obj_val = {obj_val:.6e}, "
f"relax = {relaxation:.3f}")
iter_ += 1
# Add bad data detection like LM does
# if norm_f < tol * 10 and iter_ > 3:
# r, sigma2, Pii, rN, imax, b, bad_data_detected = b_test(
# sigma2=sigma2, H=H, dz=dz, HtWH=G, c_threshold=c_threshold, logger=logger
# )
#
# if bad_data_detected and prefer_correct:
# # Implement correction logic similar to LM
# if Pii[imax] > 1e-10:
# z_tilde_imax = z[imax] - (sigma[imax] ** 2 / Pii[imax]) * r[imax]
# z[imax] = z_tilde_imax
# if verbose > 0:
# logger.add_info(f"Corrected measurement {imax}")
# Final processing
Sf, St, If, It, Vbranch, loading, losses, Sbus = power_flow_post_process_nonlinear(
Sbus=Scalc, V=V, F=nc.passive_branch_data.F, T=nc.passive_branch_data.T,
pv=pv, vd=vd, Ybus=Ybus, Yf=Yf, Yt=Yt, Yshunt_bus=Yshunt_bus,
branch_rates=nc.passive_branch_data.rates, Sbase=nc.Sbase
)
end_time = time.time()
logger.add_info(f"State estimation completed in {iter_} iterations, time: {end_time - start_time:.3f}s")
if verbose > 1:
from matplotlib import pyplot as plt
plt.plot(error_list)
plt.yscale('log')
# plt.show()
return NumericStateEstimationResults(V=V,
Scalc=Scalc,
m=nc.active_branch_data.tap_module,
tau=nc.active_branch_data.tap_angle,
Sf=Sf,
St=St,
If=If,
It=It,
loading=loading,
losses=losses,
Pf_vsc=np.zeros(nc.nvsc, dtype=float),
St_vsc=np.zeros(nc.nvsc, dtype=complex),
If_vsc=np.zeros(nc.nvsc, dtype=float),
It_vsc=np.zeros(nc.nvsc, dtype=complex),
losses_vsc=np.zeros(nc.nvsc, dtype=float),
loading_vsc=np.zeros(nc.nvsc, dtype=float),
Sf_hvdc=np.zeros(nc.nhvdc, dtype=complex),
St_hvdc=np.zeros(nc.nhvdc, dtype=complex),
losses_hvdc=np.zeros(nc.nhvdc, dtype=complex),
loading_hvdc=np.zeros(nc.nhvdc, dtype=complex),
norm_f=norm_f,
converged=bool(converged),
iterations=iter_,
elapsed=time.time() - start_time,
is_observable=bool(converged),
bad_data_detected=False)
[docs]
def decoupled_state_estimation(nc: NumericalCircuit,
Ybus: CscMat,
Yf: CscMat,
Yt: CscMat,
Yshunt_bus: CxVec,
F: IntVec,
T: IntVec,
Cf: csc_matrix,
Ct: csc_matrix,
se_input: StateEstimationInput,
vd: IntVec,
pv: IntVec,
no_slack: IntVec,
tol=1e-9,
max_iter=100,
verbose: int = 0,
c_threshold: float = 4.0,
prefer_correct: bool = False,
fixed_slack: bool = False,
logger: Logger | None = None) -> NumericStateEstimationResults:
"""
Fast decoupled WLS state estimator using LU decomposition.
Active power -> angles
Reactive power -> voltage magnitudes
"""
start_time = time.time()
logger if logger is not None else Logger()
load_per_bus = nc.load_data.get_injections_per_bus() / nc.Sbase
# --- Initialize voltages ---
V = nc.bus_data.Vbus.copy()
Va = np.angle(V)
Vm = np.abs(V)
# Identify non-slack buses
non_slack_buses = no_slack # Your no_slack variable
n_non_slack = len(non_slack_buses)
# --- Measurement vector and weights ---
z, sigma, measurements = get_measurements_and_deviations(t=None,
se_input=se_input,
Sbase=nc.Sbase,
use_current_squared_meas=False)
W = diags(1.0 / sigma ** 2, 0, format="csc")
# --- Create measurement type mapping based on processing order ---
# The measurements are processed in this fixed order:
# 1. p_inj, 2. q_inj, 3. pg_inj, 4. qg_inj,
# 5. pf_value, 6. pt_value, 7. qf_value, 8. qt_value,
# 9. if_value, 10. it_value, 11. vm_value, 12. va_value
# Count measurements in each category
counts = [
len(se_input.p_inj), len(se_input.q_inj),
len(se_input.pg_inj), len(se_input.qg_inj),
len(se_input.pf_value), len(se_input.pt_value),
len(se_input.qf_value), len(se_input.qt_value),
len(se_input.if_value), len(se_input.it_value),
len(se_input.vm_value), len(se_input.va_value)
]
# Create measurement type array
measurement_types = []
for i, count in enumerate(counts):
if i in [0, 2, 4, 5]: # p_inj, pg_inj, pf_value, pt_value
measurement_types.extend(['P'] * count)
elif i in [1, 3, 6, 7]: # q_inj, qg_inj, qf_value, qt_value
measurement_types.extend(['Q'] * count)
else: # current and voltage measurements (I, V, ΞΈ)
measurement_types.extend(['Other'] * count)
measurement_types = np.array(measurement_types)
# Create indices for active and reactive measurements
a_idx = np.where(measurement_types == 'P')[0] # Active power measurements
r_idx = np.where(measurement_types == 'Q')[0] # Reactive power measurements
if verbose > 0:
logger.add_info(f"Active power measurements: {len(a_idx)}")
logger.add_info(f"Reactive power measurements: {len(r_idx)}")
relax_theta = 1.0 # angle relaxation
relax_V = 0.5 # voltage relaxation
reg_eps = 1e-8 # tiny reg for Ga
reg_eps_v = 1e-6 # slightly larger reg for Gr
max_theta_step = 0.3
max_V_step = 0.2
iter_count = 0
converged = False
previous_max_update = 1e6
while not converged and iter_count < max_iter:
iter_count += 1
# --- 1) Recompute Jacobian and measurement function
H, h, Scalc = Jacobian_SE(Ybus, Yf, Yt, V, F, T, Cf, Ct,
se_input, non_slack_buses, load_per_bus, fixed_slack)
if H.shape[0] != z.shape[0]:
raise ValueError(f"H rows ({H.shape[0]}) != len(z) ({len(z)}) - check measurement ordering")
dz = z - h
# --- 2) Build and factorize Ga (P-subsystem)
Ha = H.tocsr()[a_idx, :n_non_slack] # dP/dΞΈ (rows = P-meas, cols = ΞΈ_non_slack)
Wa = W[a_idx, :][:, a_idx].tocsc()
Ga = Ha.T @ Wa @ Ha
if Ga.shape[0] > 0:
# Ga_reg = Ga + eps_base * np.diag(np.max(np.abs(Ga), axis=0))
Ga = Ga + reg_eps * diags(np.ones(Ga.shape[0]), 0, format='csc')
# --- 3) Compute Ta
Ta = Ha.T @ Wa @ dz[a_idx] / 1
# --- 4) Solve Ga * dtheta = Ta
try:
lu_ga = splu(Ga)
dtheta = lu_ga.solve(Ta)
except Exception:
dtheta = spilu(Ga).solve(Ta)
# safety clip + apply relaxation
if np.any(np.abs(dtheta) > max_theta_step):
dtheta = np.clip(dtheta, -max_theta_step, max_theta_step)
Va[non_slack_buses] += relax_theta * dtheta
V = Vm * np.exp(1j * Va) # recompute complex voltage after angle update
# --- 5) Recompute Jacobian/h after angle update (needed for Q-subsystem) ---
H, h, Scalc = Jacobian_SE(Ybus, Yf, Yt, V, F, T, Cf, Ct,
se_input, non_slack_buses, load_per_bus, fixed_slack)
dz = z - h
# --- 6) Build and factorize Gr (Q-subsystem)
Hr = H.tocsr()[r_idx, n_non_slack:2 * n_non_slack] # dQ/dV for non-slack V columns
Wr = W[r_idx, :][:, r_idx].tocsc()
Gr = Hr.T @ Wr @ Hr
if Gr.shape[0] > 0:
# Gr_reg = Gr + eps_base * np.diag(np.max(np.abs(Gr), axis=0))
Gr = Gr + reg_eps_v * diags(np.ones(Gr.shape[0]), 0, format='csc')
# --- 7) Compute Tr and solve Gr * dV = Tr
dz_r_scaled = dz[r_idx] / np.maximum(Vm, 0.01)
Tr = Hr.T @ Wr @ dz_r_scaled
try:
lu_gr = splu(Gr)
dV = lu_gr.solve(Tr)
except Exception:
dV = spilu(Gr).solve(Tr)
# safety clip + apply relaxation
if np.any(np.abs(dV) > max_V_step):
dV = np.clip(dV, -max_V_step, max_V_step)
Vm[non_slack_buses] += relax_V * dV
V = Vm * np.exp(1j * Va)
# --- 8) Convergence & divergence checks
max_theta_update = np.max(np.abs(dtheta)) if dtheta.size > 0 else 0.0
max_V_update = np.max(np.abs(dV)) if dV.size > 0 else 0.0
norm_f = max(max_theta_update, max_V_update)
if verbose > 0:
logger.add_info(f"Iter {iter_count}: max update = {norm_f:.6f}, max |dz| = {np.max(np.abs(dz)):.6f}")
# simple divergence guard
if norm_f > previous_max_update * 5 and iter_count > 1:
if verbose > 0:
logger.add_info(f"Divergence detected at iteration {iter_count}")
logger.add_info(f"Updates: theta={max_theta_update:.6f}, V={max_V_update:.6f}")
break
previous_max_update = norm_f
if norm_f < tol * 100: # decoupled solution checks for both active & reactive parts against tolerance,
# so its a bit lower, but it also provides same(very) result for all tests with more stricter tol
converged = True
if verbose > 0:
logger.add_info(f"Converged at iter {iter_count}")
break
Sf, St, If, It, Vbranch, loading, losses, Sbus = power_flow_post_process_nonlinear(
Sbus=Scalc, V=V, F=nc.passive_branch_data.F, T=nc.passive_branch_data.T,
pv=pv, vd=vd, Ybus=Ybus, Yf=Yf, Yt=Yt, Yshunt_bus=Yshunt_bus,
branch_rates=nc.passive_branch_data.rates, Sbase=nc.Sbase
)
return NumericStateEstimationResults(V=V,
Scalc=Scalc,
m=nc.active_branch_data.tap_module,
tau=nc.active_branch_data.tap_angle,
Sf=Sf,
St=St,
If=If,
It=It,
loading=loading,
losses=losses,
Pf_vsc=np.zeros(nc.nvsc, dtype=float),
St_vsc=np.zeros(nc.nvsc, dtype=complex),
If_vsc=np.zeros(nc.nvsc, dtype=float),
It_vsc=np.zeros(nc.nvsc, dtype=complex),
losses_vsc=np.zeros(nc.nvsc, dtype=float),
loading_vsc=np.zeros(nc.nvsc, dtype=float),
Sf_hvdc=np.zeros(nc.nhvdc, dtype=complex),
St_hvdc=np.zeros(nc.nhvdc, dtype=complex),
losses_hvdc=np.zeros(nc.nhvdc, dtype=complex),
loading_hvdc=np.zeros(nc.nhvdc, dtype=complex),
norm_f=norm_f,
converged=bool(converged),
iterations=iter_count,
elapsed=time.time() - start_time,
is_observable=bool(converged), # by default it is observable if it converges
bad_data_detected=False)