# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from VeraGridEngine.basic_structures import CDF, IntVec, Vec, StrVec, CxMat, Mat
from VeraGridEngine.Simulations.results_table import ResultsTable
from VeraGridEngine.Simulations.results_template import ResultsTemplate, ResultsProperty
from VeraGridEngine.enumerations import StudyResultsType, ResultTypes, DeviceType
[docs]
class StochasticPowerFlowResults(ResultsTemplate):
LOCAL_RESULTS_DECLARATIONS = (
ResultsProperty(name='points_number', tpe=int, old_names=list(), expandable=False),
ResultsProperty(name='bus_names', tpe=StrVec, old_names=list(), expandable=False),
ResultsProperty(name='branch_names', tpe=StrVec, old_names=list(), expandable=False),
ResultsProperty(name='bus_types', tpe=IntVec, old_names=list(), expandable=False),
ResultsProperty(name='S_points', tpe=CxMat, old_names=list(), expandable=False),
ResultsProperty(name='V_points', tpe=CxMat, old_names=list(), expandable=False),
ResultsProperty(name='Sbr_points', tpe=CxMat, old_names=list(), expandable=False),
ResultsProperty(name='loading_points', tpe=CxMat, old_names=list(), expandable=False),
ResultsProperty(name='losses_points', tpe=CxMat, old_names=list(), expandable=False),
ResultsProperty(name='error_series', tpe=list, old_names=list(), expandable=False),
ResultsProperty(name='voltage', tpe=Vec, old_names=list(), expandable=False),
ResultsProperty(name='loading', tpe=Vec, old_names=list(), expandable=False),
ResultsProperty(name='sbranch', tpe=Vec, old_names=list(), expandable=False),
ResultsProperty(name='losses', tpe=Vec, old_names=list(), expandable=False),
ResultsProperty(name='v_std_conv', tpe=Mat, old_names=list(), expandable=False),
ResultsProperty(name='s_std_conv', tpe=Mat, old_names=list(), expandable=False),
ResultsProperty(name='l_std_conv', tpe=Mat, old_names=list(), expandable=False),
ResultsProperty(name='loss_std_conv', tpe=Mat, old_names=list(), expandable=False),
ResultsProperty(name='v_avg_conv', tpe=Mat, old_names=list(), expandable=False),
ResultsProperty(name='s_avg_conv', tpe=Mat, old_names=list(), expandable=False),
ResultsProperty(name='l_avg_conv', tpe=Mat, old_names=list(), expandable=False),
ResultsProperty(name='loss_avg_conv', tpe=Mat, old_names=list(), expandable=False),
)
__slots__ = (
"points_number",
"bus_names",
"branch_names",
"bus_types",
"S_points",
"V_points",
"Sbr_points",
"loading_points",
"losses_points",
"error_series",
"voltage",
"loading",
"sbranch",
"losses",
"v_std_conv",
"s_std_conv",
"l_std_conv",
"loss_std_conv",
"v_avg_conv",
"s_avg_conv",
"l_avg_conv",
"loss_avg_conv",
)
def __init__(self, n, m, p, bus_names, branch_names, bus_types):
"""
Constructor
@param n: number of nodes
@param m: number of Branches
@param p: number of points (rows)
"""
ResultsTemplate.__init__(self,
name='Stochastic Power Flow',
available_results={
ResultTypes.BusResults: [ResultTypes.BusVoltageAverage,
ResultTypes.BusVoltageStd,
ResultTypes.BusVoltageCDF,
ResultTypes.BusPowerCDF],
ResultTypes.BranchResults: [ResultTypes.BranchPowerAverage,
ResultTypes.BranchPowerStd,
ResultTypes.BranchPowerCDF,
ResultTypes.BranchLoadingAverage,
ResultTypes.BranchLoadingStd,
ResultTypes.BranchLoadingCDF,
ResultTypes.BranchLossesAverage,
ResultTypes.BranchLossesStd,
ResultTypes.BranchLossesCDF]
},
time_array=None,
clustering_results=None,
study_results_type=StudyResultsType.StochasticPowerFlow)
self.points_number = p
self.bus_names = bus_names
self.branch_names = branch_names
self.bus_types = bus_types
self.S_points = np.zeros((p, n), dtype=complex)
self.V_points = np.zeros((p, n), dtype=complex)
self.Sbr_points = np.zeros((p, m), dtype=complex)
self.loading_points = np.zeros((p, m), dtype=complex)
self.losses_points = np.zeros((p, m), dtype=complex)
self.error_series = list()
self.voltage = np.zeros(n)
self.loading = np.zeros(m)
self.sbranch = np.zeros(m)
self.losses = np.zeros(m)
# magnitudes standard deviation convergence
self.v_std_conv = None
self.s_std_conv = None
self.l_std_conv = None
self.loss_std_conv = None
# magnitudes average convergence
self.v_avg_conv = None
self.s_avg_conv = None
self.l_avg_conv = None
self.loss_avg_conv = None
# TODO: Register results
[docs]
def append_batch(self, mcres):
"""
Append a batch (a StochasticPowerFlowResults object) to this object
@param mcres: StochasticPowerFlowResults object
@return:
"""
self.S_points = np.vstack((self.S_points, mcres.S_points))
self.V_points = np.vstack((self.V_points, mcres.V_points))
self.Sbr_points = np.vstack((self.Sbr_points, mcres.Sbr_points))
self.loading_points = np.vstack((self.loading_points, mcres.loading_points))
self.losses_points = np.vstack((self.losses_points, mcres.loading_points))
[docs]
def get_voltage_sum(self):
"""
Return the voltage summation
@return:
"""
return self.V_points.sum(axis=0)
[docs]
def compile(self):
"""
Compiles the final Monte Carlo values by running an online mean and
@return:
"""
p, n = self.V_points.shape
ni, m = self.Sbr_points.shape
step = 1
nn = int(np.floor(p / step) + 1)
self.v_std_conv = np.zeros((nn, n))
self.s_std_conv = np.zeros((nn, m))
self.l_std_conv = np.zeros((nn, m))
self.loss_std_conv = np.zeros((nn, m))
self.v_avg_conv = np.zeros((nn, n))
self.s_avg_conv = np.zeros((nn, m))
self.l_avg_conv = np.zeros((nn, m))
self.loss_avg_conv = np.zeros((nn, m))
v_mean = np.zeros(n)
c_mean = np.zeros(m)
l_mean = np.zeros(m)
loss_mean = np.zeros(m)
v_std = np.zeros(n)
c_std = np.zeros(m)
l_std = np.zeros(m)
loss_std = np.zeros(m)
for t in range(1, p, step):
v_mean_prev = v_mean.copy()
c_mean_prev = c_mean.copy()
l_mean_prev = l_mean.copy()
loss_mean_prev = loss_mean.copy()
v = np.abs(self.V_points[t, :])
c = self.Sbr_points[t, :].real
l = self.loading_points[t, :].real
loss = self.losses_points[t, :].real
v_mean += (v - v_mean) / t
v_std += (v - v_mean) * (v - v_mean_prev)
self.v_avg_conv[t] = v_mean
self.v_std_conv[t] = v_std / t
c_mean += (c - c_mean) / t
c_std += (c - c_mean) * (c - c_mean_prev)
self.s_std_conv[t] = c_std / t
self.s_avg_conv[t] = c_mean
l_mean += (l - l_mean) / t
l_std += (l - l_mean) * (l - l_mean_prev)
self.l_std_conv[t] = l_std / t
self.l_avg_conv[t] = l_mean
loss_mean += (loss - loss_mean) / t
loss_std += (loss - loss_mean) * (loss - loss_mean_prev)
self.loss_std_conv[t] = loss_std / t
self.loss_avg_conv[t] = loss_mean
self.voltage = self.v_avg_conv[-2]
self.sbranch = self.s_avg_conv[-2]
self.loading = self.l_avg_conv[-2]
self.losses = self.loss_avg_conv[-2]
# def get_dict(self):
# """
# Returns a dictionary with the results sorted in a dictionary
# :return: dictionary of 2D numpy arrays (probably of complex numbers)
# """
# data = {'P': self.S_points.real.tolist(),
# 'Q': self.S_points.imag.tolist(),
# 'Vm': np.abs(self.V_points).tolist(),
# 'Va': np.angle(self.V_points).tolist(),
# 'Ibr_real': self.Sbr_points.real.tolist(),
# 'Ibr_imag': self.Sbr_points.imag.tolist(),
# 'Sbr_real': self.Sbr_points.real.tolist(),
# 'Sbr_imag': self.Sbr_points.imag.tolist(),
# 'loading': np.abs(self.loading_points).tolist(),
# 'losses': np.abs(self.losses_points).tolist()}
# return data
[docs]
def query_voltage(self, power_array):
"""
Fantastic function that allows to query the voltage from the sampled points without having to run power Sf
Args:
power_array: power Injections vector
Returns: Interpolated voltages vector
"""
x_train = np.hstack((self.S_points.real, self.S_points.imag))
y_train = np.hstack((self.V_points.real, self.V_points.imag))
x_test = np.hstack((power_array.real, power_array.imag))
n, d = x_train.shape
# # declare PCA reductor
# red = PCA()
#
# # Train PCA
# red.fit(x_train, y_train)
#
# # Reduce power dimensions
# x_train = red.transform(x_train)
# model = MLPRegressor(hidden_layer_sizes=(10*n, n, n, n), activation='relu', solver_type='adam', alpha=0.0001,
# batch_size=2, learning_rate='constant', learning_rate_init=0.01, power_t=0.5,
# max_iter=3, shuffle=True, random_state=None, tol=0.0001, verbose=True,
# warm_start=False, momentum=0.9, nesterovs_momentum=True, early_stopping=False,
# validation_fraction=0.1, beta_1=0.9, beta_2=0.999, epsilon=1e-08)
# algorithm : {βautoβ, βball_treeβ, βkd_treeβ, βbruteβ},
# model = KNeighborsRegressor(n_neighbors=4, algorithm='brute', leaf_size=16)
model = RandomForestRegressor(10)
model.fit(x_train, y_train)
y_pred = model.predict(x_test)
return y_pred[:, :int(d / 2)] + 1j * y_pred[:, int(d / 2):d]
[docs]
def get_index_loading_cdf(self, max_val=1.0):
"""
Find the elements where the CDF is greater or equal to a value
:param max_val: value to compare
:return: indices, associated probability
"""
# turn the loading real values into CDF
cdf = CDF(np.abs(self.Sbr_points.real))
n = cdf.arr.shape[1]
idx = list()
val = list()
prob = list()
for i in range(n):
# Find the indices that surpass max_val
many_idx = np.where(cdf.arr[:, i] > max_val)[0]
# if there are indices, pick the first; store it and its associated probability
if len(many_idx) > 0:
idx.append(i)
val.append(cdf.arr[many_idx[0], i])
prob.append(1 - cdf.prob[many_idx[
0]]) # the CDF stores the chance of beign leq than the value, hence the overload is the complementary
return idx, val, prob, cdf.arr[-1, :]
[docs]
def mdl(self, result_type: ResultTypes) -> ResultsTable:
"""
Plot the results
:param result_type:
:return:
"""
if result_type == ResultTypes.BusVoltageAverage:
labels = self.bus_names
y = self.v_avg_conv[1:-1, :]
y_label = '(p.u.)'
x_label = 'Sampling points'
return ResultsTable(data=y,
index=np.arange(0, y.shape[0], 1),
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BusDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BranchPowerAverage:
labels = self.branch_names
y = self.s_avg_conv[1:-1, :]
y_label = '(MW)'
x_label = 'Sampling points'
return ResultsTable(data=y,
index=np.arange(0, y.shape[0], 1),
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BranchDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BranchLoadingAverage:
labels = self.branch_names
y = self.l_avg_conv[1:-1, :] * 100.0
y_label = '(%)'
x_label = 'Sampling points'
return ResultsTable(data=y,
index=np.arange(0, y.shape[0], 1),
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BranchDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BranchLossesAverage:
labels = self.branch_names
y = self.loss_avg_conv[1:-1, :]
y_label = '(MVA)'
x_label = 'Sampling points'
return ResultsTable(data=y,
index=np.arange(0, y.shape[0], 1),
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BranchDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BusVoltageStd:
labels = self.bus_names
y = self.v_std_conv[1:-1, :]
y_label = '(p.u.)'
x_label = 'Sampling points'
return ResultsTable(data=y,
index=np.arange(0, y.shape[0], 1),
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BusDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BranchPowerStd:
labels = self.branch_names
y = self.s_std_conv[1:-1, :]
y_label = '(MW)'
x_label = 'Sampling points'
return ResultsTable(data=y,
index=np.arange(0, y.shape[0], 1),
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BranchDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BranchLoadingStd:
labels = self.branch_names
y = self.l_std_conv[1:-1, :] * 100.0
y_label = '(%)'
x_label = 'Sampling points'
return ResultsTable(data=y,
index=np.arange(0, y.shape[0], 1),
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BranchDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BranchLossesStd:
labels = self.branch_names
y = self.loss_std_conv[1:-1, :]
y_label = '(MVA)'
x_label = 'Sampling points'
return ResultsTable(data=y,
index=np.arange(0, y.shape[0], 1),
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BranchDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BusVoltageCDF:
labels = self.bus_names
cdf = CDF(np.abs(self.V_points))
y_label = '(p.u.)'
x_label = r"Probability $P(X \leq x)$"
return ResultsTable(data=cdf.arr,
index=cdf.prob,
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BusDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BranchLoadingCDF:
labels = self.branch_names
cdf = CDF(np.abs(self.loading_points.real * 100.0))
y_label = '(%)'
x_label = r'Probability $P(X \leq x)$'
return ResultsTable(data=cdf.arr,
index=cdf.prob,
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BranchDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BranchLossesCDF:
labels = self.branch_names
cdf = CDF(np.abs(self.losses_points))
y_label = '(MVA)'
x_label = r'Probability $P(X \leq x)$'
return ResultsTable(data=cdf.arr,
index=cdf.prob,
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BranchDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BranchPowerCDF:
labels = self.branch_names
cdf = CDF(self.Sbr_points.real)
y_label = '(MW)'
x_label = r'Probability $P(X \leq x)$'
return ResultsTable(data=cdf.arr,
index=cdf.prob,
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BranchDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
elif result_type == ResultTypes.BusPowerCDF:
labels = self.bus_names
cdf = CDF(self.S_points.real)
y_label = '(p.u.)'
x_label = r'Probability $P(X \leq x)$'
return ResultsTable(data=cdf.arr,
index=cdf.prob,
idx_device_type=DeviceType.NoDevice,
columns=labels,
cols_device_type=DeviceType.BusDevice,
title=result_type.value,
ylabel=y_label,
xlabel=x_label,
units=y_label)
else:
raise Exception('Unsupported result type: ' + str(result_type))