Source code for VeraGridEngine.Simulations.InvestmentsEvaluation.investments_evaluation_results

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import textwrap
import numpy as np
from matplotlib import pyplot as plt
import matplotlib.colors as plt_colors

from VeraGridEngine.Simulations.results_template import ResultsTemplate, ResultsProperty
from VeraGridEngine.Simulations.results_table import ResultsTable
from VeraGridEngine.basic_structures import IntVec, Vec, StrVec, Mat
from VeraGridEngine.enumerations import StudyResultsType, ResultTypes, DeviceType
from VeraGridEngine.Utils.NumericalMethods.MVRSM_mo_pareto import non_dominated_sorting


[docs] class InvestmentsEvaluationResults(ResultsTemplate): LOCAL_RESULTS_DECLARATIONS = ( ResultsProperty(name='max_eval', tpe=int, old_names=list(), expandable=False), ResultsProperty(name='f_names', tpe=StrVec, old_names=list(), expandable=False), ResultsProperty(name='x_names', tpe=StrVec, old_names=list(), expandable=False), ResultsProperty(name='plot_x_idx', tpe=int, old_names=list(), expandable=False), ResultsProperty(name='plot_y_idx', tpe=int, old_names=list(), expandable=False), ResultsProperty(name='x', tpe=Mat, old_names=list(), expandable=False), ResultsProperty(name='f', tpe=Mat, old_names=list(), expandable=False), ResultsProperty(name='f_best', tpe=Vec, old_names=list(), expandable=False), ResultsProperty(name='sorting_indices', tpe=IntVec, old_names=list(), expandable=False), ) __slots__ = ( "_max_eval", "f_names", "x_names", "plot_x_idx", "plot_y_idx", "_x", "_f", "_f_best", "_sorting_indices", "_InvestmentsEvaluationResults__eval_index", ) tpe = 'Investments Evaluation Results' def __init__(self, f_names: StrVec, x_names: StrVec, max_eval: int, plot_x_idx: int, plot_y_idx: int): """ Constructor :param f_names: Names of the objectives :param x_names: Names of the decision vars :param max_eval: Maximum number of evaluations :param plot_x_idx: index of f to use as x when plotting :param plot_y_idx: index of f to use as y when plotting """ available_results = { ResultTypes.ReportsResults: [ResultTypes.InvestmentsReportResults, ResultTypes.InvestmentsCombinationsResults, ResultTypes.InvestmentsObjectivesResults, ResultTypes.InvestmentsFrequencyResults], ResultTypes.ParetoResults: [ResultTypes.InvestmentsParetoReportResults, ResultTypes.InvestmentsParetoCombinationsResults, ResultTypes.InvestmentsParetoObjectivesResults ], ResultTypes.SpecialPlots: [ResultTypes.InvestmentsParetoPlot, ResultTypes.InvestmentsIterationsPlot, ResultTypes.InvestmentsWhenToMakePlot], } ResultsTemplate.__init__(self, name='Investments Evaluation', available_results=available_results, time_array=None, clustering_results=None, study_results_type=StudyResultsType.InvestmentEvaluations) n_f = len(f_names) n_x = len(x_names) self._max_eval = max_eval self.f_names: StrVec = f_names self.x_names: StrVec = x_names self.plot_x_idx = plot_x_idx self.plot_y_idx = plot_y_idx self._x: IntVec = np.zeros((max_eval, n_x), dtype=float) self._f: IntVec = np.zeros((max_eval, n_f), dtype=float) self._f_best = np.zeros(n_f, dtype=float) self._sorting_indices = np.zeros(max_eval, dtype=int) self.__eval_index: int = 0 @property def max_eval(self) -> int: return self._max_eval @max_eval.setter def max_eval(self, val: int): self._max_eval = val @property def x(self) -> Mat: return self._x @x.setter def x(self, val: Vec): if isinstance(val, np.ndarray): self._x = val else: raise ValueError("X must be a numpy array") @property def f(self) -> Mat: return self._f @f.setter def f(self, val: Vec): if isinstance(val, np.ndarray): self._f = val else: raise ValueError("f must be a numpy array") @property def f_best(self) -> IntVec: return self._f_best @f_best.setter def f_best(self, val: Vec): if isinstance(val, np.ndarray): self._f_best = val else: raise ValueError("f_best must be a numpy array") @property def current_evaluation(self) -> int: return self.__eval_index @property def sorting_indices(self) -> IntVec: return self._sorting_indices @sorting_indices.setter def sorting_indices(self, val: IntVec): if isinstance(val, np.ndarray): self._sorting_indices = val.astype(int) else: raise ValueError("sorting indices must be an array of integer")
[docs] def get_index(self) -> StrVec: return np.array([f"Eval {i + 1}" for i in range(self.x.shape[0])])
[docs] def set_at(self, i: int, x_vec: Vec, f_vec: Vec): """ :param i: :param x_vec: :param f_vec: :return: """ self._x[i, :] = x_vec self._f[i, :] = f_vec
[docs] def add(self, x_vec: Vec, f_vec: Vec) -> None: """ :param x_vec: :param f_vec: :return: """ if self.__eval_index < self.max_eval: self.set_at(i=self.__eval_index, x_vec=x_vec, f_vec=f_vec) self.__eval_index += 1 else: print('Evaluation index out of range')
[docs] def finalize(self): """ Finalize the results after simulation """ # crop the data to the latest call index if self.__eval_index > 0: self._f = self._f[:self.__eval_index, :] self._x = self._x[:self.__eval_index, :] # Dedup x rows before Pareto sorting: equal-x rows don't dominate each # other and would all land in the front. Raw _x/_f stay intact so the # iteration, frequency, and reports views still see every evaluation. _, unique_idx = np.unique(self._x, axis=0, return_index=True) unique_idx = np.sort(unique_idx) # compute the pareto sorting indices on the deduplicated slice, then map # the resulting positions back to indices into the full _x/_f arrays _, _, pareto_local = non_dominated_sorting( y_values=self._f[unique_idx, :], x_values=self._x[unique_idx, :], ) self._sorting_indices = unique_idx[pareto_local] # we curtail this one too self.max_eval = self.__eval_index
[docs] def set_best_combination(self, combination: IntVec) -> None: """ Set the best combination of investment groups :param combination: Vector of integers (0/1) """ self._f_best = combination
[docs] def mdl(self, result_type) -> ResultsTable: """ Plot the results :param result_type: type of results (string) :return: DataFrame of the results (or None if the result was not understood) """ n = self.x.shape[0] index = self.get_index() if result_type in (ResultTypes.InvestmentsReportResults, ResultTypes.InvestmentsParetoReportResults): columns = np.r_[np.array(self.f_names), np.array(self.x_names)] data = np.c_[self.f, self.x] if result_type == ResultTypes.InvestmentsParetoReportResults: # slice results according to the pareto indices index = index[self.sorting_indices] data = data[self.sorting_indices, :] return ResultsTable(data=data, index=index, idx_device_type=DeviceType.NoDevice, columns=columns, cols_device_type=DeviceType.NoDevice.NoDevice, title=str(result_type.value), ylabel="", xlabel='', units="") elif result_type == ResultTypes.InvestmentsFrequencyResults: freq = np.sum(self._x, axis=0) freq_rel = freq / freq.sum() data = np.c_[freq, freq_rel] return ResultsTable(data=data, index=np.array(self.x_names), idx_device_type=DeviceType.NoDevice, columns=np.array(["Frequency", "Relative frequency"]), cols_device_type=DeviceType.NoDevice.NoDevice, title=str(result_type.value), ylabel="", xlabel="", units="") elif result_type in (ResultTypes.InvestmentsCombinationsResults, ResultTypes.InvestmentsParetoCombinationsResults): if result_type == ResultTypes.InvestmentsParetoCombinationsResults: # slice results according to the pareto indices data = self._x[self.sorting_indices, :] index = index[self.sorting_indices] else: data = self._x return ResultsTable(data=data, index=index, idx_device_type=DeviceType.NoDevice, columns=self.x_names, cols_device_type=DeviceType.NoDevice.NoDevice, title=str(result_type.value), ylabel="", xlabel="", units="") elif result_type in (ResultTypes.InvestmentsObjectivesResults, ResultTypes.InvestmentsParetoObjectivesResults): data = self.f if result_type == ResultTypes.InvestmentsParetoObjectivesResults: # slice results according to the pareto indices data = data[self.sorting_indices, :] index = index[self.sorting_indices] return ResultsTable(data=data, index=index, idx_device_type=DeviceType.NoDevice, columns=self.f_names, cols_device_type=DeviceType.NoDevice.NoDevice, title=str(result_type.value), ylabel="", xlabel="", units="") elif result_type == ResultTypes.InvestmentsParetoPlot: x_vals = self.f[:, self.plot_x_idx] y_vals = self.f[:, self.plot_y_idx] plt.ion() color_norm = plt_colors.Normalize() fig, ax3 = plt.subplots(1, 1, figsize=(16, 12)) # Match magnitude of technical score with investment score color_score = y_vals * 10 ** -2 # Plot 1: Technical vs investment scatter_plot = ax3.scatter(x_vals, y_vals, c=color_score) ax3.set_xlabel(self.f_names[self.plot_x_idx], fontsize=10) ax3.set_ylabel(self.f_names[self.plot_y_idx], fontsize=10) ax3.set_title('Pareto plot', fontsize=12) ax3.tick_params(axis='both', which='major', labelsize=8) cbar1 = plt.colorbar(scatter_plot, ax=ax3, fraction=0.05) cbar1.set_label('Objective function', fontsize=10) cbar1.ax.tick_params(labelsize=8) fig.suptitle(result_type.value) plt.tight_layout() annot = ax3.annotate("", xy=(0, 0), xytext=(20, 20), textcoords="offset points", bbox=dict(boxstyle="round", fc="w", pad=0.3), arrowprops=dict(arrowstyle="->"), fontsize=8, zorder=10) # Set z-order to a higher value to ensure it's in front annot.set_visible(False) def update_annotation(ind): """ :param ind: :return: """ i = ind["ind"][0] annot.xy = scatter_plot.get_offsets()[i] text = f"Solution:\n{index[i]}" wrapped_text = textwrap.fill(text, width=30) annot.set_text(wrapped_text) annot.get_bbox_patch().set_alpha(0.8) def hover(event): """ :param event: :return: """ if event.inaxes == ax3: cont, ind = scatter_plot.contains(event) if cont: update_annotation(ind) annot.set_visible(True) fig.canvas.draw_idle() else: if annot.get_visible(): annot.set_visible(False) fig.canvas.draw_idle() def click_solution(event): """ :param event: :return: """ if event.inaxes is not None: click_x, click_y = event.xdata, event.ydata tolerance = 0.1 * (ax3.get_xlim()[1] - ax3.get_xlim()[0]) offsets = scatter_plot.get_offsets() scatter_x = offsets[:, 0] scatter_y = offsets[:, 1] distances = np.hypot(scatter_x - click_x, scatter_y - click_y) min_idx = distances.argmin() if distances[min_idx] < tolerance: investment_names = index[min_idx] print("Investments made:") for i, x_val in enumerate(self.x[min_idx, :]): if x_val != 0.0: print(self.x_names[i]) fig.canvas.mpl_connect("motion_notify_event", hover) fig.canvas.mpl_connect('button_press_event', click_solution) plt.show() plt.show() return ResultsTable(data=np.c_[x_vals, y_vals], index=np.array(index), idx_device_type=DeviceType.NoDevice, columns=np.array([self.f_names[self.plot_x_idx], self.f_names[self.plot_y_idx]]), cols_device_type=DeviceType.NoDevice.NoDevice, title="Pareto plot", ylabel=self.f_names[self.plot_y_idx], xlabel=self.f_names[self.plot_x_idx], units="") elif result_type == ResultTypes.InvestmentsIterationsPlot: columns = ["Iteration", "Objectives summation"] x = np.arange(self.max_eval) y = self.f.sum(axis=1) data = np.c_[x, y] y_label = '' title = '' plt.ion() fig = plt.figure(figsize=(8, 6)) ax3 = plt.subplot(1, 1, 1) ax3.plot(x, y, '.') ax3.set_xlabel('Iteration') ax3.set_ylabel('Objective') fig.suptitle(str(result_type.value)) plt.grid() plt.show() return ResultsTable(data=data, index=np.array(index), idx_device_type=DeviceType.NoDevice, columns=np.array(columns), cols_device_type=DeviceType.NoDevice.NoDevice, title=title, ylabel=y_label, xlabel='', units=y_label) elif result_type == ResultTypes.InvestmentsWhenToMakePlot: # Create subplots fig, ax = plt.subplots(1, 1, figsize=(16, 6), sharey=False) # _x is (max_eval, n_investments) # X is (pareto solutions, n_investments) X = self._x[self.sorting_indices, :].astype(int) max_years = np.max(X) mat = np.zeros((X.shape[1], max_years)) for i in range(X.shape[0]): # evaluation index for j in range(X.shape[1]): # investment index year = X[i, j] if year > 0: mat[j, year - 1] += 1 ax.imshow(mat) ax.set_title("When to make the investments", fontsize=14) ax.set_xlabel("", fontsize=12) ax.set_xticks(np.arange(max_years), np.arange(1, max_years + 1)) ax.set_yticks(np.arange(len(self.x_names))) ax.set_yticklabels(self.x_names) ax.tick_params(axis='x', labelsize=11) ax.tick_params(axis='y', labelsize=11) ax.set_xlabel("Year", fontsize=12) fig.tight_layout() plt.show() return ResultsTable(data=mat, index=self.x_names, idx_device_type=DeviceType.NoDevice, columns=np.array([str(y) for y in range(max_years)]), cols_device_type=DeviceType.NoDevice.NoDevice, title=str(result_type.value), ylabel="", xlabel="", units="") else: raise Exception('Result type not understood:' + str(result_type))