Source code for VeraGridEngine.Simulations.ATC.available_transfer_capacity_ts_driver

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.  
# SPDX-License-Identifier: MPL-2.0

from __future__ import annotations

import numpy as np
from typing import Union, List

from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.Simulations.LinearFactors.linear_analysis_options import LinearAnalysisOptions
from VeraGridEngine.Simulations.LinearFactors.linear_analysis_ts_driver import LinearAnalysisTimeSeriesDriver
from VeraGridEngine.Simulations.LinearFactors.linear_analysis import LinearAnalysis
from VeraGridEngine.Simulations.ATC.available_transfer_capacity_driver import compute_atc_list, compute_alpha, \
    compute_dP
from VeraGridEngine.Simulations.ATC.available_transfer_capacity_options import AvailableTransferCapacityOptions
from VeraGridEngine.Simulations.results_table import ResultsTable
from VeraGridEngine.Simulations.results_template import ResultsTemplate
from VeraGridEngine.Simulations.driver_template import TimeSeriesDriverTemplate
from VeraGridEngine.Simulations.Clustering.clustering_results import ClusteringResults
from VeraGridEngine.basic_structures import Vec, Mat, IntVec, StrVec, DateVec
from VeraGridEngine.enumerations import StudyResultsType, AvailableTransferMode, ResultTypes, DeviceType, \
    SimulationTypes


[docs] class AvailableTransferCapacityTimeSeriesResults(ResultsTemplate): """ AvailableTransferCapacityTimeSeriesResults """ LOCAL_RESULTS_DECLARATIONS = tuple() __slots__ = ( "branch_names", "bus_names", "rates", "contingency_rates", "base_exchange", "raw_report", "report", "report_headers", "report_indices", ) def __init__(self, br_names: StrVec, bus_names: StrVec, rates: Mat, contingency_rates: Mat, time_array: DateVec, clustering_results: Union[ClusteringResults, None] = None): """ :param br_names: :param bus_names: :param rates: :param contingency_rates: :param time_array: """ ResultsTemplate.__init__( self, name='ATC Results', available_results=[ ResultTypes.AvailableTransferCapacityReport ], time_array=time_array, clustering_results=clustering_results, study_results_type=StudyResultsType.AvailableTransferCapacity ) # self.time_array = time_array self.branch_names = np.array(br_names, dtype=object) self.bus_names = bus_names self.rates = rates self.contingency_rates = contingency_rates self.base_exchange = 0 self.raw_report = None self.report = None self.report_headers: StrVec = None self.report_indices: IntVec = None
[docs] def clear(self): """ Crear the results :return: """ self.base_exchange = 0 self.raw_report = None self.report = None self.report_headers: StrVec = None self.report_indices: IntVec = None
[docs] def make_report(self, threshold: float = 0.0): """ :return: """ self.report_headers = [ 'Time', 'Branch', 'Base flow', 'Rate', 'Alpha', 'ATC normal', 'Limiting contingency branch', 'Limiting contingency flow', 'Contingency rate', 'Beta', 'Contingency ATC', 'ATC', 'Base exchange flow', 'NTC' ] self.report = np.empty((len(self.raw_report), len(self.report_headers)), dtype=object) rep = np.array(self.raw_report) # sort by ATC if len(self.raw_report): t = rep[:, 0].astype(int) m = rep[:, 1].astype(int) c = rep[:, 2].astype(int) time_dict = {ti: t for ti, t in zip(self.time_indices, self.time_array)} for i in range(self.report.shape[0]): self.report[i, 0] = time_dict[t[i]].strftime('%d/%m/%Y %H:%M') self.report[i, 3] = self.rates[t[i], m[i]] # 'Rate', (time, branch) self.report[i, 8] = self.contingency_rates[t[i], m[i]] # Branch name self.report[:, 1] = self.branch_names[m] # Base flow' self.report[:, 2] = rep[:, 10] # alpha self.report[:, 4] = rep[:, 3] # 'ATC normal' self.report[:, 5] = rep[:, 6] # contingency info ----- # 'Limiting contingency branch' self.report[:, 6] = self.branch_names[c] # 'Limiting contingency flow' self.report[:, 7] = rep[:, 11] # 'Beta' self.report[:, 9] = rep[:, 4] # 'Contingency ATC' self.report[:, 10] = rep[:, 7] # ATC self.report[:, 11] = rep[:, 8] # Base exchange flow self.report[:, 12] = rep[:, 14] # NTC self.report[:, 13] = rep[:, 9] # trim by abs alpha > threshold and loading <= 1 # loading = np.abs(self.report[:, 2] / (self.report[:, 3] + 1e-20)) idx = list() for i in range(self.report.shape[0]): if (abs(self.report[i, 4]) > threshold and abs(self.report[i, 2] / (self.report[i, 3] + 1e-20)) <= 1.0): idx.append(i) idx = np.array(idx, dtype=int) self.report = self.report[idx, :] self.report_indices = np.arange(0, self.report.shape[0]) else: print('Empty raw report :/')
[docs] def get_dict(self): """ Returns a dictionary with the results sorted in a dictionary :return: dictionary of 2D numpy arrays (probably of complex numbers) """ data = super().get_dict() data['report'] = self.report.tolist() return data
[docs] def mdl(self, result_type: ResultTypes) -> ResultsTable: """ Plot the results :param result_type: :return: """ if result_type == ResultTypes.AvailableTransferCapacityReport: return ResultsTable( data=np.array(self.report), index=self.report_indices, columns=self.report_headers, title=result_type.value, ylabel="", cols_device_type=DeviceType.NoDevice, idx_device_type=DeviceType.NoDevice ) else: raise Exception('Result type not understood:' + str(result_type))
[docs] class AvailableTransferCapacityTimeSeriesDriver(TimeSeriesDriverTemplate): __slots__ = ("options",) tpe = SimulationTypes.NetTransferCapacityTS_run name = tpe.value def __init__(self, grid: MultiCircuit, options: AvailableTransferCapacityOptions | None, time_indices: IntVec, clustering_results: Union[ClusteringResults, None] = None): """ Power Transfer Distribution Factors class constructor :param grid: MultiCircuit Object :param options: OPF options :param time_indices: array of time indices to simulate :param clustering_results: ClusteringResults (optional) """ TimeSeriesDriverTemplate.__init__( self, grid=grid, time_indices=time_indices, clustering_results=clustering_results ) # Options to use self.options = options # OPF results self.results = AvailableTransferCapacityTimeSeriesResults( br_names=self.grid.get_branch_names(add_hvdc=False, add_vsc=False, add_switch=True), bus_names=self.grid.get_bus_names(), rates=self.grid.get_branch_rates_prof(), contingency_rates=self.grid.get_branch_contingency_rates_prof(), time_array=self.grid.time_profile[self.time_indices], clustering_results=clustering_results )
[docs] def get_steps(self) -> List[str]: """ Get time steps list of strings """ # No time steps because so far the only output is a report return []
[docs] def run(self) -> None: """ Run thread """ self.tic() mode_2_int = {AvailableTransferMode.Generation: 0, AvailableTransferMode.InstalledPower: 1, AvailableTransferMode.Load: 2, AvailableTransferMode.GenerationAndLoad: 3} # declare the linear analysis self.report_text("Analyzing...") self.report_progress(0.0) la_options = LinearAnalysisOptions( distribute_slack=self.options.distributed_slack, correct_values=self.options.correct_values, ) la_driver = LinearAnalysisTimeSeriesDriver( grid=self.grid, options=la_options, time_indices=self.time_indices ) la_driver.copy_signals(other=self) la_driver.run() # get the branch indices to analyze nc = compile_numerical_circuit_at(self.grid, logger=self.logger) br_idx = nc.passive_branch_data.get_monitor_enabled_indices() con_br_idx = nc.passive_branch_data.get_contingency_enabled_indices() # declare the results self.results.clear() for it, t in enumerate(self.time_indices): self.report_text('Available transfer capacity at ' + str(self.grid.time_profile[t])) nc = compile_numerical_circuit_at(circuit=self.grid, t_idx=t) linear_analysis = LinearAnalysis( nc=nc, distributed_slack=True, correct_values=False, ) P: Vec = nc.get_power_injections().real # get flow if self.options.use_provided_flows: flows_t = self.options.Pf[t, :] if self.options.Pf is None: msg = 'The option to use the provided flows is enabled, but no flows are available' self.logger.add_error(msg) raise Exception(msg) else: flows_t: Vec = linear_analysis.get_flows(Sbus=P, P_hvdc=nc.hvdc_data.Pset) # compute the branch exchange sensitivity (alpha) dP = compute_dP( P0=P, P_installed=nc.bus_data.installed_power, Pgen=nc.generator_data.get_injections_per_bus().real, Pload=nc.load_data.get_injections_per_bus().real, bus_a1_idx=self.options.bus_idx_from, bus_a2_idx=self.options.bus_idx_to, mode=mode_2_int[self.options.mode], dT=1.0 ) alpha = compute_alpha( ptdf=linear_analysis.PTDF, dP=dP, dT=1.0 ) # base exchange base_exchange = (self.options.inter_area_branch_sense * flows_t[self.options.inter_area_branch_idx]).sum() # consider the HVDC transfer if self.options.Pf_hvdc is not None: if len(self.options.idx_hvdc_br): base_exchange += (self.options.inter_area_hvdc_branch_sense * self.options.Pf_hvdc[t, self.options.idx_hvdc_br]).sum() # compute ATC report = compute_atc_list(br_idx=br_idx, contingency_br_idx=con_br_idx, lodf=linear_analysis.LODF, alpha=alpha, flows=flows_t, rates=self.results.rates[t, :], contingency_rates=self.results.contingency_rates[t, :], base_exchange=base_exchange, time_idx=t, threshold=self.options.threshold) report = np.array(report, dtype=object) # sort by NTC report = report[report[:, 9].argsort()] # curtail report if self.options.max_report_elements > 0: report = report[:self.options.max_report_elements, :] # post-process and store the results if self.results.raw_report is None: self.results.raw_report = report else: self.results.raw_report = np.r_[self.results.raw_report, report] self.report_progress2(it, len(self.time_indices)) if self.__cancel__: break self.report_text('Building the report...') self.results.make_report() self.toc()
[docs] def cancel(self): self.__cancel__ = True