Source code for VeraGridEngine.Simulations.StateEstimation.state_estimation_inputs
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import numpy as np
from typing import List, Dict
from VeraGridEngine.basic_structures import IntVec, BoolVec
from VeraGridEngine.Devices.measurement import (PfMeasurement, QfMeasurement,
PtMeasurement, QtMeasurement,
PiMeasurement, QiMeasurement,
PgMeasurement, QgMeasurement,
VmMeasurement, VaMeasurement,
IfMeasurement, ItMeasurement,
MeasurementTemplate)
[docs]
def slice_pair(obj_measurements: List[MeasurementTemplate],
obj_indices: List[int],
index_map: Dict[int, int]):
"""
Slice obj_measurements and obj_indices using an index->island index map
:param obj_measurements: list of measurements
:param obj_indices: list of device indices where the measurement applies
:param index_map: main index -> island index mapping
:return: new_obj_measurement, new_obj_index
"""
new_obj_measurement = list()
new_obj_indices = list()
for main_index, measurement in zip(obj_indices, obj_measurements):
island_index = index_map.get(main_index, None)
if island_index is not None:
new_obj_indices.append(island_index)
new_obj_measurement.append(measurement)
return new_obj_measurement, new_obj_indices
[docs]
class StateEstimationInput:
"""
StateEstimationInput
"""
def __init__(self) -> None:
"""
State estimation inputs constructor
"""
self.p_inj: List[PiMeasurement] = list() # Node active power measurements vector of pointers
self.p_idx: List[int] = list() # nodes with power injection measurements
self.q_inj: List[QiMeasurement] = list() # Node reactive power measurements vector of pointers
self.q_idx: List[int] = list() # nodes with reactive power injection measurements
self.pg_inj: List[PgMeasurement] = list() # Node active power measurements vector of pointers
self.pg_idx: List[int] = list() # nodes with power injection measurements
self.qg_inj: List[QgMeasurement] = list() # Node reactive power measurements vector of pointers
self.qg_idx: List[int] = list() # nodes with reactive power injection measurements
self.pf_value: List[PfMeasurement] = list() # Branch active power measurements vector of pointers
self.pf_idx: List[int] = list() # Branches with power measurements
self.pt_value: List[PtMeasurement] = list() # Branch active power measurements vector of pointers
self.pt_idx: List[int] = list() # Branches with power measurements
self.qf_value: List[QfMeasurement] = list() # Branch reactive power measurements vector of pointers
self.qf_idx: List[int] = list() # Branches with reactive power measurements
self.qt_value: List[QtMeasurement] = list() # Branch reactive power measurements vector of pointers
self.qt_idx: List[int] = list() # Branches with reactive power measurements
self.if_value: List[IfMeasurement] = list() # Branch current module measurements vector of pointers
self.if_idx: List[int] = list() # Branches with current measurements
self.it_value: List[ItMeasurement] = list() # Branch current module measurements vector of pointers
self.it_idx: List[int] = list() # Branches with current measurements
self.vm_value: List[VmMeasurement] = list() # Node voltage module measurements vector of pointers
self.vm_idx: List[int] = list() # nodes with voltage module measurements
self.va_value: List[VaMeasurement] = list() # Node voltage angle measurements vector of pointers
self.va_idx: List[int] = list() # nodes with voltage angle measurements
nz = self.size()
self.flags = np.ones(nz, dtype=int)
[docs]
def size(self):
return (len(self.p_inj)
+ len(self.q_inj)
+ len(self.pg_inj)
+ len(self.qg_inj)
+ len(self.pf_value)
+ len(self.pt_value)
+ len(self.qf_value)
+ len(self.qt_value)
+ len(self.if_value)
+ len(self.it_value)
+ len(self.vm_value)
+ len(self.va_value))
[docs]
def slice(self, bus_idx: IntVec, branch_idx: IntVec) -> "StateEstimationInput":
"""
Slice this object given the island branch and bus indices
:param bus_idx: array of bus indices of an island
:param branch_idx: array of branch indices of an island
:return: new sliced StateEstimationInput
"""
se = StateEstimationInput()
# Map old indices β new indices
bus_index_map = {main_idx: island_idx for island_idx, main_idx in enumerate(bus_idx)}
branch_index_map = {main_idx: island_idx for island_idx, main_idx in enumerate(branch_idx)}
se.p_inj, se.p_idx = slice_pair(self.p_inj, self.p_idx, bus_index_map)
se.q_inj, se.q_idx = slice_pair(self.q_inj, self.q_idx, bus_index_map)
se.pg_inj, se.pg_idx = slice_pair(self.pg_inj, self.pg_idx, bus_index_map)
se.qg_inj, se.qg_idx = slice_pair(self.qg_inj, self.qg_idx, bus_index_map)
se.vm_value, se.vm_idx = slice_pair(self.vm_value, self.vm_idx, bus_index_map)
se.va_value, se.va_idx = slice_pair(self.va_value, self.va_idx, bus_index_map)
se.pf_value, se.pf_idx = slice_pair(self.pf_value, self.pf_idx, branch_index_map)
se.qf_value, se.qf_idx = slice_pair(self.qf_value, self.qf_idx, branch_index_map)
se.if_value, se.if_idx = slice_pair(self.if_value, self.if_idx, branch_index_map)
se.pt_value, se.pt_idx = slice_pair(self.pt_value, self.pt_idx, branch_index_map)
se.qt_value, se.qt_idx = slice_pair(self.qt_value, self.qt_idx, branch_index_map)
se.it_value, se.it_idx = slice_pair(self.it_value, self.it_idx, branch_index_map)
return se
[docs]
def slice_with_mask(self, mask: BoolVec) -> "StateEstimationInput":
"""
Get a new StateEstimationInput without the measurements that fall in the mask marked with a 0
:param mask: mask of measurements to keep
:return: StateEstimationInput
"""
mask = np.asarray(mask, dtype=bool)
new_obj = StateEstimationInput()
k = 0
for m_here, m_there, idx_here, idx_there in [(self.p_inj, new_obj.p_inj, self.p_idx, new_obj.p_idx),
(self.q_inj, new_obj.q_inj, self.q_idx, new_obj.q_idx),
(self.pg_inj, new_obj.pg_inj, self.pg_idx, new_obj.pg_idx),
(self.qg_inj, new_obj.qg_inj, self.qg_idx, new_obj.qg_idx),
(self.pf_value, new_obj.pf_value, self.pf_idx, new_obj.pf_idx),
(self.pt_value, new_obj.pt_value, self.pt_idx, new_obj.pt_idx),
(self.qf_value, new_obj.qf_value, self.qf_idx, new_obj.qf_idx),
(self.qt_value, new_obj.qt_value, self.qt_idx, new_obj.qt_idx),
(self.if_value, new_obj.if_value, self.if_idx, new_obj.if_idx),
(self.it_value, new_obj.it_value, self.it_idx, new_obj.it_idx),
(self.vm_value, new_obj.vm_value, self.vm_idx, new_obj.vm_idx),
(self.va_value, new_obj.va_value, self.va_idx, new_obj.va_idx)]:
for m, i in zip(m_here, idx_here):
if mask[k]:
m_there.append(m)
idx_there.append(i)
k += 1
return new_obj