Source code for VeraGridEngine.Simulations.StateEstimation.pseudo_measurements_augmentation



import numpy as np

from VeraGridEngine.Devices.Substation.bus import Bus
from VeraGridEngine.Devices.measurement import MeasurementTemplate
from VeraGridEngine.enumerations import DeviceType

[docs] class PseudoMeasurement(MeasurementTemplate): def __init__(self, value, sigma, api_obj: Bus, name="", idtag = None): # If this has to be injected in DB it has to get a object instance of class Device. In order # to persist it in DB it should be saved in Asset class ! """ Parameters ---------- value : float Per-unit value of pseudo measurement sigma : float Per-unit standard deviation (large β†’ low weight) bus : int Bus index this pseudo measurement belongs to mtype : str Measurement type, e.g. "p_inj", "q_inj" """ MeasurementTemplate.__init__(self, value=value, uncertainty=sigma, api_obj=api_obj, name=name, idtag=idtag, device_type=DeviceType.NoDevice) self.value = value self.sigma = sigma self.bus = api_obj
[docs] def get_value_pu(self, Sbase: float): return self.value / Sbase
[docs] def get_standard_deviation_pu(self, Sbase: float): return self.sigma / Sbase
[docs] def build_neighbors(Cf, Ct): """ Build neighbor list per bus from connectivity matrices. Cf, Ct: sparse branch-to-bus incidence matrices """ n_buses = Cf.shape[1] neighbors = [[] for _ in range(n_buses)] # convert sparse matrices to COO for fast iteration Cf_coo, Ct_coo = Cf.tocoo(), Ct.tocoo() for br in range(Cf_coo.shape[0]): i = Cf_coo.col[br] j = Ct_coo.col[br] if i != j: neighbors[i].append(j) neighbors[j].append(i) return neighbors
[docs] def compute_power_injection(bus, V, Ybus, neighbors): """ Compute AC active and reactive power injection for a bus using neighbors. neighbors: prebuilt list of lists, neighbors[i] = list of neighbor bus indices """ Vi = abs(V[bus]) thetai = np.angle(V[bus]) Pi, Qi = 0.0, 0.0 for nb in neighbors[bus]: Vj = abs(V[nb]) thetaj = np.angle(V[nb]) Gij = Ybus[bus, nb].real Bij = Ybus[bus, nb].imag Pi += Vi * Vj * (Gij * np.cos(thetai - thetaj) + Bij * np.sin(thetai - thetaj)) Qi += Vi * Vj * (Gij * np.sin(thetai - thetaj) - Bij * np.cos(thetai - thetaj)) # self-admittance part Yii = Ybus[bus, bus] Pi += Vi ** 2 * Yii.real Qi -= Vi ** 2 * Yii.imag return Pi, Qi
[docs] def add_pseudo_measurements(se_input, unobservable_buses, V, Ybus, neighbors,bus_dict, sigma_pseudo=1.0,Sbase=100,logger=None, ): """ Extend se_input with pseudo-measurements for unobservable buses. neighbors: prebuilt neighbor list per bus """ for bus_idx in unobservable_buses: Pi, Qi = compute_power_injection(bus_idx, V, Ybus, neighbors) # Fallback for zero pseudo-measurements if abs(Pi) < 1e-6: # Use average of neighboring line flows (approximation) if neighbors[bus_idx]: Pi = sum(abs(Ybus[bus_idx, nb]) * abs(V[bus_idx]) * abs(V[nb]) for nb in neighbors[bus_idx]) / len(neighbors[bus_idx]) else: Pi = 0.1 # small default non-zero value if abs(Qi) < 1e-6: Qi = 0.0 # often reactive load is unknown; can keep 0 or small value # Get the Bus object for this bus_idx bus_obj = bus_dict[bus_idx] pm_p = PseudoMeasurement(Pi*Sbase, sigma_pseudo, bus_obj,"pseudo") pm_q = PseudoMeasurement(Qi*Sbase, sigma_pseudo, bus_obj, "pseudo",)# converted later to pu in get_measurements se_input.p_idx.append(bus_idx) # or appropriate index mapping se_input.p_inj.append(pm_p) se_input.q_idx.append(bus_idx) se_input.q_inj.append(pm_q) if logger: logger.add_info( f"Pseudo-measurement added at bus {bus_obj}", device="pseudo", device_class="virtual", device_property="P, Q", value=(Pi, Qi) ) return se_input