Source code for VeraGridEngine.Simulations.StateEstimation.state_stimation_driver

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations

from typing import Union, Dict

from VeraGridEngine.Simulations.StateEstimation.observability_analysis import (
    check_for_observability_and_return_unobservable_buses, add_pseudo_measurements_for_unobservable_buses)
from VeraGridEngine.Simulations.StateEstimation.pseudo_measurements_augmentation import PseudoMeasurement
from VeraGridEngine.Simulations.StateEstimation.state_estimation_results import StateEstimationResults
from VeraGridEngine.basic_structures import ConvergenceReport
from VeraGridEngine.Simulations.StateEstimation.state_estimation import (solve_se_nr, solve_se_lm,
                                                                         solve_se_gauss_newton,
                                                                         decoupled_state_estimation)
from VeraGridEngine.Simulations.StateEstimation.state_estimation_inputs import StateEstimationInput
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.Simulations.driver_template import DriverTemplate
from VeraGridEngine.enumerations import SolverType, SimulationTypes


[docs] class StateEstimationOptions: """ StateEstimationOptions """ __slots__ = ( "solver", "tol", "max_iter", "verbose", "prefer_correct", "c_threshold", "fixed_slack", "observability_analysis", "add_pseudo_measurements", "pseudo_meas_std", "run_meas_profiling", "include_line_measurements_on_both_ends", ) def __init__(self, solver: SolverType = SolverType.NR, tol: float = 1e-8, max_iter: int = 100, verbose: int = 0, prefer_correct: bool = True, c_threshold: float = 4.0, fixed_slack: bool = False, run_observability_analyis: bool = False, add_pseudo_measurements: bool = False, run_measurement_profiling: bool = False, include_line_measurements_on_both_ends: bool = True, pseudo_meas_std: float = 1.0): """ :param solver: :param tol: Tolerance :param max_iter: Maximum number of iterations :param verbose: Verbosity level (1 light, 2 heavy) :param prefer_correct: Prefer measurement correction? otherwise measurement deletion is used :param c_threshold: confidence threshold (default 4.0) :param fixed_slack: if true, the measurements on the slack bus are omitted :param run_observability_analyis: :param add_pseudo_measurements: :param run_measurement_profiling: :param include_line_measurements_on_both_ends: :param pseudo_meas_std: """ self.solver = solver self.tol = tol self.max_iter = max_iter self.verbose = verbose self.prefer_correct = prefer_correct self.c_threshold = c_threshold self.fixed_slack = fixed_slack self.observability_analysis = run_observability_analyis self.add_pseudo_measurements = add_pseudo_measurements self.pseudo_meas_std = pseudo_meas_std self.run_meas_profiling = run_measurement_profiling self.include_line_measurements_on_both_ends = include_line_measurements_on_both_ends
[docs] class StateEstimationConvergenceReport(ConvergenceReport): __slots__ = ( "bad_data_detected", "unobservable_buses", "bus_contribution", "pseudo_measurements", "_is_observable", "measurement_profile", ) def __init__(self) -> None: """ Constructor """ super().__init__() self.bad_data_detected = False self.unobservable_buses = list() self.bus_contribution = list() self.pseudo_measurements = list() self._is_observable = True self.measurement_profile = list()
[docs] def add_se(self, method, converged: bool, error: float, elapsed: float, iterations: int, bad_data_detected: bool, is_observable: bool, bus_contribution: list, pseudo_measurements: list, unobservable_buses: list, measurement_profile: dict): """ :param method: :param converged: :param error: :param elapsed: :param iterations: :param bad_data_detected: :param is_observable: :param bus_contribution: :param pseudo_measurements: :param unobservable_buses: :param measurement_profile: :return: """ # Call parent's add method for common parameters self.add(method, converged, error, elapsed, iterations) self.bad_data_detected = bad_data_detected self._is_observable = is_observable self.add_bus_contribution(bus_contribution) self.add_pseudo_measurements(pseudo_measurements) self.add_unobservable_buses(unobservable_buses) self.measurement_profile.append(measurement_profile)
@property def is_observable(self) -> bool: """ Get info is the island was observable :return: List of is_obsevable booleans """ return self._is_observable @is_observable.setter def is_observable(self, value: bool) -> None: self._is_observable = value
[docs] def get_bad_data_detected(self) -> bool: """ Get bad data detection results :return: List of bad data detection results """ return self.bad_data_detected
[docs] def get_unobservable_buses(self) -> list: """ :return: """ return self.unobservable_buses
[docs] def get_bus_contribution(self) -> list: """ :return: """ return self.bus_contribution
[docs] def get_pseudo_measurements(self) -> list: """ :return: """ return self.pseudo_measurements
[docs] def add_unobservable_buses(self, unobservable_buses): """ :param unobservable_buses: :return: """ self.unobservable_buses.append(unobservable_buses)
[docs] def add_bus_contribution(self, bus_contribution): """ :param bus_contribution: :return: """ self.bus_contribution.append(bus_contribution)
[docs] def add_pseudo_measurements(self, se_input): """ :param se_input: :return: """ for m in se_input.p_inj: if isinstance(m, PseudoMeasurement): self.pseudo_measurements.append(m)
[docs] def add_measurement_profile(self, meas_profile): """ :param meas_profile: :return: """ self.measurement_profile.append(meas_profile)
[docs] def get_measurement_profile(self) -> list: """ :return: """ return self.measurement_profile
[docs] class StateEstimationDriver(DriverTemplate): __slots__ = ("options",) name = 'State estimation' tpe = SimulationTypes.StateEstimation_run def __init__(self, circuit: MultiCircuit, options: StateEstimationOptions | None = None): """ Constructor :param circuit: circuit object """ DriverTemplate.__init__(self, grid=circuit) self.results: Union[StateEstimationResults, None] = None self.options = StateEstimationOptions() if options is None else options
[docs] @staticmethod def collect_measurements(circuit: MultiCircuit) -> StateEstimationInput: """ Form the input from the circuit measurements :return: nothing, the input object is stored in this class """ se_input = StateEstimationInput() # bus measurements bus_dict = circuit.get_bus_idtag_index_dict() for elm in circuit.get_p_measurements(): se_input.p_idx.append(bus_dict[elm.device_idtag]) se_input.p_inj.append(elm) for elm in circuit.get_q_measurements(): se_input.q_idx.append(bus_dict[elm.device_idtag]) se_input.q_inj.append(elm) # build gen->bus index dict gen_dict: Dict[str, int] = dict() for gen in circuit.generators: gen_dict[gen.idtag] = bus_dict[gen.bus.idtag] for elm in circuit.get_pg_measurements(): se_input.pg_idx.append(gen_dict[elm.device_idtag]) se_input.pg_inj.append(elm) for elm in circuit.get_qg_measurements(): se_input.qg_idx.append(gen_dict[elm.device_idtag]) se_input.qg_inj.append(elm) for elm in circuit.get_vm_measurements(): se_input.vm_idx.append(bus_dict[elm.device_idtag]) se_input.vm_value.append(elm) for elm in circuit.get_va_measurements(): se_input.va_idx.append(bus_dict[elm.device_idtag]) se_input.va_value.append(elm) # branch measurements branch_dict = circuit.get_branches_idtag_index_dict(add_vsc=False, add_hvdc=False, add_switch=True) for elm in circuit.get_pf_measurements(): se_input.pf_idx.append(branch_dict[elm.device_idtag]) se_input.pf_value.append(elm) for elm in circuit.get_pt_measurements(): se_input.pt_idx.append(branch_dict[elm.device_idtag]) se_input.pt_value.append(elm) for elm in circuit.get_qf_measurements(): se_input.qf_idx.append(branch_dict[elm.device_idtag]) se_input.qf_value.append(elm) for elm in circuit.get_qt_measurements(): se_input.qt_idx.append(branch_dict[elm.device_idtag]) se_input.qt_value.append(elm) for elm in circuit.get_if_measurements(): se_input.if_idx.append(branch_dict[elm.device_idtag]) se_input.if_value.append(elm) for elm in circuit.get_it_measurements(): se_input.it_idx.append(branch_dict[elm.device_idtag]) se_input.it_value.append(elm) return se_input
[docs] def run(self): """ Run state estimation :return: """ self.tic() n = len(self.grid.buses) m = self.grid.get_branch_number(add_vsc=False, add_hvdc=False, add_switch=True) nc = compile_numerical_circuit_at(self.grid, logger=self.logger) self.results = StateEstimationResults(n=n, m=m, n_hvdc=nc.nhvdc, n_vsc=nc.nvsc, n_gen=nc.ngen, n_batt=nc.nbatt, n_sh=nc.nshunt, bus_names=nc.bus_data.names, branch_names=nc.passive_branch_data.names, hvdc_names=nc.hvdc_data.names, vsc_names=nc.vsc_data.names, gen_names=nc.generator_data.names, batt_names=nc.battery_data.names, sh_names=nc.shunt_data.names, bus_types=nc.bus_data.bus_types) # self.se_results.initialize(n, m) islands = nc.split_into_islands() # collect inputs of the island se_input = self.collect_measurements(circuit=self.grid) for island in islands: idx = island.get_simulation_indices() adm = island.get_admittance_matrices() se_input_island = se_input.slice(bus_idx=island.bus_data.original_idx, branch_idx=island.passive_branch_data.original_idx) bus_dict = {i: self.grid.buses[idx] for i, idx in enumerate(island.bus_data.original_idx)} conn = island.get_connectivity_matrices() report = StateEstimationConvergenceReport() is_observable = True # by default we consider it True, if SE diverges and LM solver is chosen there assert # statement explicitly mentions unobservability, if user runs observability analysis, this boolean is # assigned as per standard output solution = None # initialized so observability results can be saved without running SE unobservable_buses = [] bus_contrib = {} measurement_profile = {} # the idea is to first run observability analysis uif user wants then the normal SE if self.options.observability_analysis: # this will provide the result of unobservable branches # here we have to store Jacobian and measurements so that it is not again recalculated in # State estimation calculations # if pseudo meas is allowed it will create meas at the unobser branches so that the net is observable # in that case Jacobian needs to be called again in SE formulations(needs to be worked upon) is_observable, unobservable_buses, measurement_profile, V, bus_contrib = check_for_observability_and_return_unobservable_buses( nc=island, Ybus=adm.Ybus, Yf=adm.Yf, Yt=adm.Yt, no_slack=idx.no_slack, F=island.passive_branch_data.F, T=island.passive_branch_data.T, Cf=conn.Cf, Ct=conn.Ct, se_input=se_input_island, fixed_slack=self.options.fixed_slack, do_profiling_of_measurements=self.options.run_meas_profiling, include_line_measurements_on_both_ends=self.options.include_line_measurements_on_both_ends, logger=self.logger) if unobservable_buses and self.options.add_pseudo_measurements: se_input_island = add_pseudo_measurements_for_unobservable_buses(bus_dict=bus_dict, unobservable_buses=unobservable_buses, se_input=se_input_island, V=V, Ybus=adm.Ybus, Cf=conn.Cf, Ct=conn.Ct, sigma_pseudo_meas_value= self.options.pseudo_meas_std, Sbase=nc.Sbase, logger=self.logger) # run Solver if self.options.solver == SolverType.NR: solution = solve_se_nr(nc=island, Ybus=adm.Ybus, Yf=adm.Yf, Yt=adm.Yt, Yshunt_bus=adm.Yshunt_bus, F=island.passive_branch_data.F, T=island.passive_branch_data.T, Cf=conn.Cf, Ct=conn.Ct, se_input=se_input_island, vd=idx.vd, pv=idx.pv, no_slack=idx.no_slack, tol=self.options.tol, max_iter=self.options.max_iter, verbose=self.options.verbose, prefer_correct=self.options.prefer_correct, c_threshold=self.options.c_threshold, fixed_slack=self.options.fixed_slack, logger=self.logger) elif self.options.solver == SolverType.LM: solution = solve_se_lm(nc=island, Ybus=adm.Ybus, Yf=adm.Yf, Yt=adm.Yt, Yshunt_bus=adm.Yshunt_bus, F=island.passive_branch_data.F, T=island.passive_branch_data.T, Cf=conn.Cf, Ct=conn.Ct, se_input=se_input_island, vd=idx.vd, pv=idx.pv, no_slack=idx.no_slack, tol=self.options.tol, max_iter=self.options.max_iter, verbose=self.options.verbose, prefer_correct=self.options.prefer_correct, c_threshold=self.options.c_threshold, fixed_slack=self.options.fixed_slack, logger=self.logger) elif self.options.solver == SolverType.GN: solution = solve_se_gauss_newton(nc=island, Ybus=adm.Ybus, Yf=adm.Yf, Yt=adm.Yt, Yshunt_bus=adm.Yshunt_bus, F=island.passive_branch_data.F, T=island.passive_branch_data.T, Cf=conn.Cf, Ct=conn.Ct, se_input=se_input_island, vd=idx.vd, pv=idx.pv, no_slack=idx.no_slack, tol=self.options.tol, max_iter=self.options.max_iter, verbose=self.options.verbose, prefer_correct=self.options.prefer_correct, c_threshold=self.options.c_threshold, fixed_slack=self.options.fixed_slack, logger=self.logger) elif self.options.solver == SolverType.Decoupled_LU: solution = decoupled_state_estimation(nc=island, Ybus=adm.Ybus, Yf=adm.Yf, Yt=adm.Yt, Yshunt_bus=adm.Yshunt_bus, F=island.passive_branch_data.F, T=island.passive_branch_data.T, Cf=conn.Cf, Ct=conn.Ct, se_input=se_input_island, vd=idx.vd, pv=idx.pv, no_slack=idx.no_slack, tol=self.options.tol, max_iter=self.options.max_iter, verbose=self.options.verbose, prefer_correct=self.options.prefer_correct, c_threshold=self.options.c_threshold, fixed_slack=self.options.fixed_slack, logger=self.logger) elif SolverType.NoSolver: self.logger.add_info(f"Solver_Type is not defined explicitly to run observability analysis only") else: raise ValueError(f"State Estimation solver type not recognized: {self.options.solver.value}") # we need to switch is_observable back to True in case the pseudo measurements were added and SE converged # if not is_observable and self.options.add_pseudo_measurements and solution.converged: # is_observable=True # we leave unobservable buses and bus_contrinb as record that pseudo measurements were added report.add_se(method=self.options.solver, converged=solution.converged if solution else False, error=solution.norm_f if solution else 1e9, elapsed=solution.elapsed if solution else 0, iterations=solution.iterations if solution else 0, bad_data_detected=solution.bad_data_detected if solution else is_observable, is_observable=solution.is_observable if solution else is_observable, unobservable_buses=unobservable_buses, bus_contribution=bus_contrib, measurement_profile=measurement_profile, pseudo_measurements=se_input_island) self.results.convergence_reports.append(report) # Scale power results from per-unit to MVA before applying island_sbase = island.Sbase if solution is not None: solution.Scalc *= island_sbase self.results.apply_from_island( results=solution, b_idx=island.bus_data.original_idx, br_idx=island.passive_branch_data.original_idx, hvdc_idx=island.hvdc_data.original_idx, vsc_idx=island.vsc_data.original_idx ) # export unobservable buses, bus_contrib and/or measurement profiling to json self.toc()