# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
import numpy as np
from typing import Union, List, TYPE_CHECKING
from VeraGridEngine.Compilers.circuit_to_gslv import gslv_contingencies_snapshot
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.enumerations import EngineType, ContingencyMethod, SimulationTypes
from VeraGridEngine.Simulations.ContingencyAnalysis.contingency_analysis_results import ContingencyAnalysisResults
from VeraGridEngine.Simulations.driver_template import DriverTemplate
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.Simulations.LinearFactors.linear_analysis import LinearMultiContingencies, LinearAnalysis
from VeraGridEngine.Simulations.ContingencyAnalysis.contingency_analysis_options import ContingencyAnalysisOptions
from VeraGridEngine.Simulations.ContingencyAnalysis.Methods.nonlinear_contingency_analysis import (
nonlinear_contingency_analysis)
from VeraGridEngine.Simulations.ContingencyAnalysis.Methods.linear_contingency_analysis import (
linear_contingency_analysis)
from VeraGridEngine.Simulations.ContingencyAnalysis.Methods.helm_contingency_analysis import helm_contingency_analysis
from VeraGridEngine.Compilers.circuit_to_bentayga import BENTAYGA_AVAILABLE
from VeraGridEngine.Compilers.circuit_to_newton_pa import (NEWTON_PA_AVAILABLE, newton_pa_contingencies,
translate_newton_pa_contingencies)
from VeraGridEngine.Compilers.circuit_to_pgm import PGM_AVAILABLE
if TYPE_CHECKING: # Only imports the below statements during type checking
from VeraGridEngine.Simulations.OPF.opf_results import OptimalPowerFlowResults
[docs]
class ContingencyAnalysisDriver(DriverTemplate):
"""
Contingency analysis driver
"""
__slots__ = (
"options",
"opf_results",
"linear_multiple_contingencies",
)
name = 'Contingency Analysis'
tpe = SimulationTypes.ContingencyAnalysis_run
def __init__(self,
grid: MultiCircuit,
options: ContingencyAnalysisOptions | None,
linear_multiple_contingencies: Union[LinearMultiContingencies, None] = None,
opf_results: Union[OptimalPowerFlowResults, None] = None,
engine: EngineType = EngineType.VeraGrid):
"""
ContingencyAnalysisDriver constructor
:param grid: MultiCircuit Object
:param options: N-k options
:param linear_multiple_contingencies: LinearMultiContingencies instance (required for linear contingencies)
:param engine Calculation engine to use
"""
DriverTemplate.__init__(self, grid=grid, engine=engine)
# Options to use
self.options = options
self.opf_results: Union[OptimalPowerFlowResults, None] = opf_results
# Set or create the LinearMultiContingencies
if linear_multiple_contingencies is None:
if options is None:
contingency_groups_used = grid.get_contingency_groups()
else:
contingency_groups_used = (grid.get_contingency_groups()
if options.contingency_groups is None
else options.contingency_groups)
self.linear_multiple_contingencies = LinearMultiContingencies(
grid=self.grid,
contingency_groups_used=contingency_groups_used
)
self.logger.add_info("Created LinearMultiContingencies because they were not provided")
else:
self.linear_multiple_contingencies: LinearMultiContingencies = linear_multiple_contingencies
# N-K results
self.results = ContingencyAnalysisResults(
ncon=self.grid.get_contingency_groups_number(),
nbus=self.grid.get_bus_number(),
nbr=self.grid.get_branch_number(add_hvdc=False, add_vsc=False, add_switch=True),
bus_names=self.grid.get_bus_names(),
branch_names=self.grid.get_branch_names(add_hvdc=False, add_vsc=False, add_switch=True),
bus_types=np.ones(self.grid.get_bus_number(), dtype=int),
con_names=np.array(self.grid.get_contingency_group_names())
)
[docs]
def get_steps(self) -> List[str]:
"""
Get variations list of strings
"""
if self.results is not None:
return ['#' + v for v in self.results.branch_names]
else:
return list()
[docs]
def run_at(self, t_idx: int = None, t_prob: float = 1.0) -> ContingencyAnalysisResults:
"""
Run the contingency at a time point
:param t_idx: index for any time series index, None for the snapshot
:param t_prob: probability of te time
:return: ContingencyAnalysisResults
"""
if self.engine == EngineType.NewtonPA and not NEWTON_PA_AVAILABLE:
self.engine = EngineType.VeraGrid
self.logger.add_warning('Tried to use Newton, but failed back to VeraGrid')
if self.engine == EngineType.Bentayga and not BENTAYGA_AVAILABLE:
self.engine = EngineType.VeraGrid
self.logger.add_warning('Tried to use Bentayga, but failed back to VeraGrid')
if self.engine == EngineType.PGM and not PGM_AVAILABLE:
self.engine = EngineType.VeraGrid
self.logger.add_warning('Tried to use PGM, but failed back to VeraGrid')
if self.engine == EngineType.VeraGrid:
if self.options.contingency_method == ContingencyMethod.PowerFlow:
area_names, bus_area_indices, F, T, hvdc_F, hvdc_T = self.grid.get_branch_areas_info()
# set the numerical circuit
nc = compile_numerical_circuit_at(self.grid,
opf_results=self.opf_results,
t_idx=t_idx)
self.results = nonlinear_contingency_analysis(
nc=nc,
options=self.options,
linear_multiple_contingencies=self.linear_multiple_contingencies,
area_names=area_names,
bus_area_indices=np.array(bus_area_indices, dtype=int),
F=F,
T=T,
report_text=self.report_text,
report_progress2=self.report_progress2,
is_cancel=self.is_cancel,
t_idx=t_idx,
t_prob=t_prob,
logger=self.logger
)
elif (self.options.contingency_method == ContingencyMethod.Linear
or self.options.contingency_method == ContingencyMethod.PTDF_scan):
area_names, bus_area_indices, F, T, hvdc_F, hvdc_T = self.grid.get_branch_areas_info()
# set the numerical circuit
nc = compile_numerical_circuit_at(self.grid,
opf_results=self.opf_results,
t_idx=t_idx)
# Linear analysis (PTDF & LODF)
lin = LinearAnalysis(
nc=nc,
distributed_slack=self.options.lin_options.distribute_slack,
correct_values=self.options.lin_options.correct_values,
logger=self.logger
)
# "smart" Contingency factors
linear_multiple_contingencies = LinearMultiContingencies(
grid=self.grid,
contingency_groups_used=self.options.contingency_groups
if self.options.contingency_groups is not None else self.grid.contingency_groups
)
linear_multiple_contingencies.compute(
lin=lin,
ptdf_threshold=self.options.lin_options.ptdf_threshold,
lodf_threshold=self.options.lin_options.lodf_threshold
)
self.results = linear_contingency_analysis(
nc=nc,
options=self.options,
linear_analysis=lin,
linear_multiple_contingencies=linear_multiple_contingencies,
area_names=area_names,
bus_area_indices=np.array(bus_area_indices, dtype=int),
F=F,
T=T,
report_text=self.report_text,
report_progress2=self.report_progress2,
is_cancel=self.is_cancel,
t=t_idx,
t_prob=t_prob,
logger=self.logger
)
elif self.options.contingency_method == ContingencyMethod.HELM:
self.results = helm_contingency_analysis(
grid=self.grid,
options=self.options,
calling_class=self,
opf_results=self.opf_results,
t=t_idx,
t_prob=t_prob
)
# elif self.options.contingency_method == ContingencyMethod.OptimalPowerFlow:
# self.results = optimal_linear_contingency_analysis(
# grid=self.grid,
# options=self.options,
# opf_options=None, # TODO: finalize this
# linear_multiple_contingencies=self.linear_multiple_contingencies,
# calling_class=self,
# t=t_idx,
# t_prob=t_prob,
# logger=self.logger
# )
else:
raise Exception(f'Unknown contingency engine {self.options.contingency_method}')
elif self.engine == EngineType.NewtonPA:
self.report_text("Running contingencies in newton...")
con_res = newton_pa_contingencies(circuit=self.grid,
con_opt=self.options,
time_series=False,
time_indices=None)
self.results = translate_newton_pa_contingencies(grid=self.grid,
con_res=con_res)
elif self.engine == EngineType.GSLV:
self.report_text("Running contingencies in gslv...")
con_res = gslv_contingencies_snapshot(circuit=self.grid,
con_opt=self.options,
opf_results=self.opf_results)
self.results = ContingencyAnalysisResults(
ncon=self.grid.get_contingency_groups_number(),
nbus=self.grid.get_bus_number(),
nbr=self.grid.get_branch_number(add_hvdc=False, add_vsc=False, add_switch=True),
bus_names=self.grid.get_bus_names(),
branch_names=self.grid.get_branch_names(add_hvdc=False, add_vsc=False, add_switch=True),
bus_types=np.ones(self.grid.get_bus_number(), dtype=int),
con_names=np.array(self.grid.get_contingency_group_names())
)
self.results.Sf = con_res.Sf
self.results.loading = con_res.loading
self.results.Sbus = con_res.Sbus
self.results.voltage = con_res.voltage
# self.results.max_flows = con_res.Sf[0, :]
# self.results.max_loading = con_res.loading[0, :]
for entry in con_res.report.entries:
self.results.report.add(
time_index=entry.time_index,
t_prob=entry.t_prob,
mon_idx=entry.mon_idx,
con_group_idx=entry.con_group_idx,
area_from=entry.area_from,
area_to=entry.area_to,
base_name=entry.base_name,
contingency_name=entry.contingency_name,
base_rating=entry.base_rating,
contingency_rating=entry.contingency_rating,
srap_rating=entry.srap_rating,
base_flow=entry.base_flow.real,
post_contingency_flow=entry.post_contingency_flow.real,
post_srap_flow=entry.post_srap_flow.real,
base_loading=entry.base_loading,
post_contingency_loading=entry.post_contingency_loading,
post_srap_loading=entry.post_srap_loading,
msg_ov=entry.msg_ov,
msg_srap=entry.msg_srap,
srap_power=entry.srap_power,
solved_by_srap=entry.solved_by_srap
)
return self.results
[docs]
def run(self) -> None:
"""
:return:
"""
self.tic()
self.run_at(t_idx=None)
self.toc()