# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
from __future__ import annotations
import numpy as np
from typing import Union, List
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.Simulations.LinearFactors.linear_analysis_options import LinearAnalysisOptions
from VeraGridEngine.Simulations.LinearFactors.linear_analysis_ts_driver import LinearAnalysisTimeSeriesDriver
from VeraGridEngine.Simulations.LinearFactors.linear_analysis import LinearAnalysis
from VeraGridEngine.Simulations.ATC.available_transfer_capacity_driver import compute_atc_list, compute_alpha, \
compute_dP
from VeraGridEngine.Simulations.ATC.available_transfer_capacity_options import AvailableTransferCapacityOptions
from VeraGridEngine.Simulations.results_table import ResultsTable
from VeraGridEngine.Simulations.results_template import ResultsTemplate
from VeraGridEngine.Simulations.driver_template import TimeSeriesDriverTemplate
from VeraGridEngine.Simulations.Clustering.clustering_results import ClusteringResults
from VeraGridEngine.basic_structures import Vec, Mat, IntVec, StrVec, DateVec
from VeraGridEngine.enumerations import StudyResultsType, AvailableTransferMode, ResultTypes, DeviceType, \
SimulationTypes
[docs]
class AvailableTransferCapacityTimeSeriesResults(ResultsTemplate):
"""
AvailableTransferCapacityTimeSeriesResults
"""
LOCAL_RESULTS_DECLARATIONS = tuple()
__slots__ = (
"branch_names",
"bus_names",
"rates",
"contingency_rates",
"base_exchange",
"raw_report",
"report",
"report_headers",
"report_indices",
)
def __init__(self, br_names: StrVec, bus_names: StrVec, rates: Mat, contingency_rates: Mat, time_array: DateVec,
clustering_results: Union[ClusteringResults, None] = None):
"""
:param br_names:
:param bus_names:
:param rates:
:param contingency_rates:
:param time_array:
"""
ResultsTemplate.__init__(
self,
name='ATC Results',
available_results=[
ResultTypes.AvailableTransferCapacityReport
],
time_array=time_array,
clustering_results=clustering_results,
study_results_type=StudyResultsType.AvailableTransferCapacity
)
# self.time_array = time_array
self.branch_names = np.array(br_names, dtype=object)
self.bus_names = bus_names
self.rates = rates
self.contingency_rates = contingency_rates
self.base_exchange = 0
self.raw_report = None
self.report = None
self.report_headers: StrVec = None
self.report_indices: IntVec = None
[docs]
def clear(self):
"""
Crear the results
:return:
"""
self.base_exchange = 0
self.raw_report = None
self.report = None
self.report_headers: StrVec = None
self.report_indices: IntVec = None
[docs]
def make_report(self, threshold: float = 0.0):
"""
:return:
"""
self.report_headers = [
'Time',
'Branch',
'Base flow',
'Rate',
'Alpha',
'ATC normal',
'Limiting contingency branch',
'Limiting contingency flow',
'Contingency rate',
'Beta',
'Contingency ATC',
'ATC',
'Base exchange flow',
'NTC'
]
self.report = np.empty((len(self.raw_report), len(self.report_headers)), dtype=object)
rep = np.array(self.raw_report)
# sort by ATC
if len(self.raw_report):
t = rep[:, 0].astype(int)
m = rep[:, 1].astype(int)
c = rep[:, 2].astype(int)
time_dict = {ti: t for ti, t in zip(self.time_indices, self.time_array)}
for i in range(self.report.shape[0]):
self.report[i, 0] = time_dict[t[i]].strftime('%d/%m/%Y %H:%M')
self.report[i, 3] = self.rates[t[i], m[i]] # 'Rate', (time, branch)
self.report[i, 8] = self.contingency_rates[t[i], m[i]]
# Branch name
self.report[:, 1] = self.branch_names[m]
# Base flow'
self.report[:, 2] = rep[:, 10]
# alpha
self.report[:, 4] = rep[:, 3]
# 'ATC normal'
self.report[:, 5] = rep[:, 6]
# contingency info -----
# 'Limiting contingency branch'
self.report[:, 6] = self.branch_names[c]
# 'Limiting contingency flow'
self.report[:, 7] = rep[:, 11]
# 'Beta'
self.report[:, 9] = rep[:, 4]
# 'Contingency ATC'
self.report[:, 10] = rep[:, 7]
# ATC
self.report[:, 11] = rep[:, 8]
# Base exchange flow
self.report[:, 12] = rep[:, 14]
# NTC
self.report[:, 13] = rep[:, 9]
# trim by abs alpha > threshold and loading <= 1
# loading = np.abs(self.report[:, 2] / (self.report[:, 3] + 1e-20))
idx = list()
for i in range(self.report.shape[0]):
if (abs(self.report[i, 4]) > threshold
and abs(self.report[i, 2] / (self.report[i, 3] + 1e-20)) <= 1.0):
idx.append(i)
idx = np.array(idx, dtype=int)
self.report = self.report[idx, :]
self.report_indices = np.arange(0, self.report.shape[0])
else:
print('Empty raw report :/')
[docs]
def get_dict(self):
"""
Returns a dictionary with the results sorted in a dictionary
:return: dictionary of 2D numpy arrays (probably of complex numbers)
"""
data = super().get_dict()
data['report'] = self.report.tolist()
return data
[docs]
def mdl(self, result_type: ResultTypes) -> ResultsTable:
"""
Plot the results
:param result_type:
:return:
"""
if result_type == ResultTypes.AvailableTransferCapacityReport:
return ResultsTable(
data=np.array(self.report),
index=self.report_indices,
columns=self.report_headers,
title=result_type.value,
ylabel="",
cols_device_type=DeviceType.NoDevice,
idx_device_type=DeviceType.NoDevice
)
else:
raise Exception('Result type not understood:' + str(result_type))
[docs]
class AvailableTransferCapacityTimeSeriesDriver(TimeSeriesDriverTemplate):
__slots__ = ("options",)
tpe = SimulationTypes.NetTransferCapacityTS_run
name = tpe.value
def __init__(self,
grid: MultiCircuit,
options: AvailableTransferCapacityOptions | None,
time_indices: IntVec,
clustering_results: Union[ClusteringResults, None] = None):
"""
Power Transfer Distribution Factors class constructor
:param grid: MultiCircuit Object
:param options: OPF options
:param time_indices: array of time indices to simulate
:param clustering_results: ClusteringResults (optional)
"""
TimeSeriesDriverTemplate.__init__(
self,
grid=grid,
time_indices=time_indices,
clustering_results=clustering_results
)
# Options to use
self.options = options
# OPF results
self.results = AvailableTransferCapacityTimeSeriesResults(
br_names=self.grid.get_branch_names(add_hvdc=False, add_vsc=False, add_switch=True),
bus_names=self.grid.get_bus_names(),
rates=self.grid.get_branch_rates_prof(),
contingency_rates=self.grid.get_branch_contingency_rates_prof(),
time_array=self.grid.time_profile[self.time_indices],
clustering_results=clustering_results
)
[docs]
def get_steps(self) -> List[str]:
"""
Get time steps list of strings
"""
# No time steps because so far the only output is a report
return []
[docs]
def run(self) -> None:
"""
Run thread
"""
self.tic()
mode_2_int = {AvailableTransferMode.Generation: 0,
AvailableTransferMode.InstalledPower: 1,
AvailableTransferMode.Load: 2,
AvailableTransferMode.GenerationAndLoad: 3}
# declare the linear analysis
self.report_text("Analyzing...")
self.report_progress(0.0)
la_options = LinearAnalysisOptions(
distribute_slack=self.options.distributed_slack,
correct_values=self.options.correct_values,
)
la_driver = LinearAnalysisTimeSeriesDriver(
grid=self.grid,
options=la_options,
time_indices=self.time_indices
)
la_driver.copy_signals(other=self)
la_driver.run()
# get the branch indices to analyze
nc = compile_numerical_circuit_at(self.grid, logger=self.logger)
br_idx = nc.passive_branch_data.get_monitor_enabled_indices()
con_br_idx = nc.passive_branch_data.get_contingency_enabled_indices()
# declare the results
self.results.clear()
for it, t in enumerate(self.time_indices):
self.report_text('Available transfer capacity at ' + str(self.grid.time_profile[t]))
nc = compile_numerical_circuit_at(circuit=self.grid, t_idx=t)
linear_analysis = LinearAnalysis(
nc=nc,
distributed_slack=True,
correct_values=False,
)
P: Vec = nc.get_power_injections().real
# get flow
if self.options.use_provided_flows:
flows_t = self.options.Pf[t, :]
if self.options.Pf is None:
msg = 'The option to use the provided flows is enabled, but no flows are available'
self.logger.add_error(msg)
raise Exception(msg)
else:
flows_t: Vec = linear_analysis.get_flows(Sbus=P, P_hvdc=nc.hvdc_data.Pset)
# compute the branch exchange sensitivity (alpha)
dP = compute_dP(
P0=P,
P_installed=nc.bus_data.installed_power,
Pgen=nc.generator_data.get_injections_per_bus().real,
Pload=nc.load_data.get_injections_per_bus().real,
bus_a1_idx=self.options.bus_idx_from,
bus_a2_idx=self.options.bus_idx_to,
mode=mode_2_int[self.options.mode],
dT=1.0
)
alpha = compute_alpha(
ptdf=linear_analysis.PTDF,
dP=dP,
dT=1.0
)
# base exchange
base_exchange = (self.options.inter_area_branch_sense * flows_t[self.options.inter_area_branch_idx]).sum()
# consider the HVDC transfer
if self.options.Pf_hvdc is not None:
if len(self.options.idx_hvdc_br):
base_exchange += (self.options.inter_area_hvdc_branch_sense
* self.options.Pf_hvdc[t, self.options.idx_hvdc_br]).sum()
# compute ATC
report = compute_atc_list(br_idx=br_idx,
contingency_br_idx=con_br_idx,
lodf=linear_analysis.LODF,
alpha=alpha,
flows=flows_t,
rates=self.results.rates[t, :],
contingency_rates=self.results.contingency_rates[t, :],
base_exchange=base_exchange,
time_idx=t,
threshold=self.options.threshold)
report = np.array(report, dtype=object)
# sort by NTC
report = report[report[:, 9].argsort()]
# curtail report
if self.options.max_report_elements > 0:
report = report[:self.options.max_report_elements, :]
# post-process and store the results
if self.results.raw_report is None:
self.results.raw_report = report
else:
self.results.raw_report = np.r_[self.results.raw_report, report]
self.report_progress2(it, len(self.time_indices))
if self.__cancel__:
break
self.report_text('Building the report...')
self.results.make_report()
self.toc()
[docs]
def cancel(self):
self.__cancel__ = True