Source code for VeraGridEngine.Simulations.OPF.opf_driver

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import numpy as np
from typing import Union
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.enumerations import SolverType, EngineType, SimulationTypes
from VeraGridEngine.Simulations.OPF.opf_options import OptimalPowerFlowOptions
from VeraGridEngine.Simulations.OPF.Formulations.linear_opf_ts import run_linear_opf_ts
from VeraGridEngine.Simulations.OPF.opf_results import OptimalPowerFlowResults
from VeraGridEngine.Simulations.OPF.ac_opf_worker import run_nonlinear_opf
from VeraGridEngine.Simulations.PowerFlow.power_flow_options import PowerFlowOptions
from VeraGridEngine.Simulations.driver_template import TimeSeriesDriverTemplate
from VeraGridEngine.Simulations.OPF.simple_dispatch_ts import GreedyDispatchInputsSnapshot, greedy_dispatch2
from VeraGridEngine.Compilers.circuit_to_newton_pa import newton_pa_linear_opf, newton_pa_nonlinear_opf
import VeraGridEngine.Compilers.circuit_to_gslv as gslv


[docs] class OptimalPowerFlowDriver(TimeSeriesDriverTemplate): __slots__ = ( "options", "all_solved", ) name = 'Optimal power flow' tpe = SimulationTypes.OPF_run def __init__(self, grid: MultiCircuit, options: Union[OptimalPowerFlowOptions, None] = None, engine: EngineType = EngineType.VeraGrid): """ PowerFlowDriver class constructor :param grid: MultiCircuit Object :param options: OPF options :param engine: Calculation engine to use (if available) """ TimeSeriesDriverTemplate.__init__(self, grid=grid, time_indices=None, clustering_results=None, engine=engine, check_time_series=False) # Options to use self.options = options if options else OptimalPowerFlowOptions() F, T = self.grid.get_branch_FT(add_vsc=False, add_hvdc=False, add_switch=True) F_hvdc, T_hvdc = self.grid.get_hvdc_FT() # OPF results self.results: OptimalPowerFlowResults = OptimalPowerFlowResults( bus_names=self.grid.get_bus_names(), branch_names=self.grid.get_branch_names(add_hvdc=False, add_vsc=False, add_switch=True), load_names=self.grid.get_load_names(), generator_names=self.grid.get_generator_names(), shunt_like_names=self.grid.get_shunt_like_devices_names(), battery_names=self.grid.get_battery_names(), hvdc_names=self.grid.get_hvdc_names(), vsc_names=self.grid.get_vsc_names(), bus_types=np.ones(self.grid.get_bus_number(), dtype=int), area_names=self.grid.get_area_names(), fluid_node_names=self.grid.get_fluid_node_names(), fluid_path_names=self.grid.get_fluid_path_names(), fluid_inj_names=self.grid.get_fluid_injection_names(), F=F, T=T, F_hvdc=F_hvdc, T_hvdc=T_hvdc, bus_area_indices=self.grid.get_bus_area_indices() ) self.all_solved = True @property def pf_options(self) -> PowerFlowOptions: """ Get the PowerFlow options provides with the OpfOptions :return: PowerFlowOptions """ return self.options.power_flow_options
[docs] def add_report(self) -> None: """ Add a report of the results (in-place) """ for gen_name, gen_shedding in zip(self.results.generator_names, self.results.generator_shedding): if gen_shedding > 0: self.logger.add_warning("Generation shedding", device=gen_name, value=gen_shedding, expected_value=0.0) for load_name, load_shedding in zip(self.results.load_names, self.results.load_shedding): if load_shedding > 0: self.logger.add_warning("Load shedding", device=load_name, value=load_shedding, expected_value=0.0) for name, val in zip(self.results.branch_names, self.results.loading): if val > 1: self.logger.add_warning("Overload", device=name, value=val * 100, expected_value=0.0) va = np.angle(self.results.voltage) for i, bus in enumerate(self.grid.buses): if va[i] > bus.angle_max: self.logger.add_warning("Overvoltage", device=bus.name, value=va[i], expected_value=bus.angle_max) elif va[i] < bus.angle_min: self.logger.add_warning("Undervoltage", device=bus.name, value=va[i], expected_value=bus.angle_min)
[docs] def opf(self, remote=False, batteries_energy_0=None) -> OptimalPowerFlowResults: """ Run a power flow for every circuit :param remote: is this function being called from the time series? :param batteries_energy_0: initial state of the batteries, if None the default values are taken :return: OptimalPowerFlowResults object """ if self.options.solver == SolverType.LINEAR_OPF: if not remote: self.report_progress(0.0) self.report_text('Formulating problem...') # DC optimal power flow opf_vars, model = run_linear_opf_ts( grid=self.grid, time_indices=None, dispatch_mode=self.options.dispatch_mode, solver_type=self.options.mip_solver, zonal_grouping=self.options.zonal_grouping, skip_generation_limits=self.options.skip_generation_limits, consider_contingencies=self.options.consider_contingencies, contingency_groups_used=self.options.contingency_groups_used, ramp_constraints=False, consider_time_up_down=False, area_spinning_reserve=False, lodf_threshold=self.options.lodf_tolerance, inter_aggregation_info=self.options.inter_aggregation_info, energy_0=None, fluid_level_0=None, use_glsk_as_cost=self.options.use_glsk_as_cost, add_losses_approximation=self.options.add_losses_approximation, logger=self.logger, verbose=self.options.verbose, robust=self.options.robust, mip_framework=self.options.mip_framework ) self.results.voltage = opf_vars.bus_vars.Vm[0, :] * np.exp(1j * opf_vars.bus_vars.Va[0, :]) self.results.bus_shadow_prices = opf_vars.bus_vars.shadow_prices[0, :] self.results.load_power = opf_vars.load_vars.p[0, :] self.results.load_shedding = opf_vars.load_vars.shedding[0, :] self.results.battery_power = opf_vars.batt_vars.p[0, :] # self.results.battery_energy = opf_vars.batt_vars.e[0, :] self.results.generator_power = opf_vars.gen_vars.p[0, :] self.results.generator_shedding = opf_vars.gen_vars.shedding[0, :] self.results.Sf = opf_vars.branch_vars.flows[0, :] self.results.St = -opf_vars.branch_vars.flows[0, :] self.results.overloads = (opf_vars.branch_vars.flow_slacks_pos[0, :] - opf_vars.branch_vars.flow_slacks_neg[0, :]) self.results.loading = opf_vars.branch_vars.loading[0, :] self.results.tap_angle = opf_vars.branch_vars.tap_angles[0, :] self.results.losses = opf_vars.branch_vars.losses[0, :] # self.results.Sbus = problem.get_power_injections()[0, :] self.results.hvdc_Pf = opf_vars.hvdc_vars.flows[0, :] self.results.hvdc_loading = opf_vars.hvdc_vars.loading[0, :] self.results.vsc_Pf = opf_vars.vsc_vars.flows[0, :] self.results.vsc_loading = opf_vars.vsc_vars.loading[0, :] self.results.converged = opf_vars.acceptable_solution self.results.fluid_node_p2x_flow = opf_vars.fluid_node_vars.p2x_flow[0, :] self.results.fluid_node_current_level = opf_vars.fluid_node_vars.current_level[0, :] self.results.fluid_node_spillage = opf_vars.fluid_node_vars.spillage[0, :] self.results.fluid_node_flow_in = opf_vars.fluid_node_vars.flow_in[0, :] self.results.fluid_node_flow_out = opf_vars.fluid_node_vars.flow_out[0, :] self.results.fluid_path_flow = opf_vars.fluid_path_vars.flow[0, :] self.results.fluid_injection_flow = opf_vars.fluid_inject_vars.flow[0, :] if self.options.report_formulation: self.results.report_text = model.model_as_string() elif self.options.solver == SolverType.GREEDY_DISPATCH_OPF: if not remote: self.report_progress(0.0) self.report_text('Greedy dispatch...') greedy_dispatch_inputs = GreedyDispatchInputsSnapshot(grid=self.grid, logger=self.logger) (gen_dispatch, batt_dispatch, batt_energy, total_cost, load_not_supplied, load_shedding, ndg_surplus_after_batt, ndg_curtailment_per_gen) = greedy_dispatch2( load_profile=greedy_dispatch_inputs.load_profile, gen_profile=greedy_dispatch_inputs.gen_profile, gen_p_max=greedy_dispatch_inputs.gen_p_max, gen_p_min=greedy_dispatch_inputs.gen_p_min, gen_dispatchable=greedy_dispatch_inputs.gen_dispatchable, gen_active=greedy_dispatch_inputs.gen_active, gen_cost=greedy_dispatch_inputs.gen_cost, batt_active=greedy_dispatch_inputs.batt_active, batt_p_max_charge=greedy_dispatch_inputs.batt_p_max_charge, batt_p_max_discharge=greedy_dispatch_inputs.batt_p_max_discharge, batt_energy_max=greedy_dispatch_inputs.batt_energy_max, batt_eff_charge=greedy_dispatch_inputs.batt_eff_charge, batt_eff_discharge=greedy_dispatch_inputs.batt_eff_discharge, batt_cost=greedy_dispatch_inputs.batt_cost, batt_soc0=greedy_dispatch_inputs.batt_soc0, batt_soc_min=greedy_dispatch_inputs.batt_soc_min, dt=greedy_dispatch_inputs.dt, force_charge_if_low=True ) # AC optimal power flow # Pl, Pg = run_simple_dispatch(grid=self.grid, # text_prog=self.report_text, # prog_func=self.report_progress) self.results.generator_power = gen_dispatch[0, :] self.results.generator_shedding = ndg_curtailment_per_gen[0, :] self.results.battery_power = batt_dispatch[0, :] self.results.battery_energy = batt_energy[0, :] self.results.load_shedding = load_shedding[0, :] self.results.converged = True elif self.options.solver == SolverType.NONLINEAR_OPF: if not remote: self.report_progress(0.0) self.report_text('Running non linear optimization...') res = run_nonlinear_opf(grid=self.grid, opf_options=self.options, t_idx=None, logger=self.logger) Sbase = self.grid.Sbase self.results.voltage = res.V self.results.Sbus = res.S * Sbase self.results.bus_shadow_prices = res.lam_p # self.results.load_shedding = npa_res.load_shedding[0, :] # self.results.battery_power = npa_res.battery_p[0, :] # self.results.battery_energy = npa_res.battery_energy[0, :] self.results.generator_power = res.Pg * Sbase self.results.generator_reactive_power = res.Qg * Sbase self.results.shunt_like_reactive_power = res.Qsh * Sbase self.results.Sf = res.Sf * Sbase self.results.St = res.St * Sbase self.results.overloads = (res.sl_sf - res.sl_st) * Sbase self.results.loading = res.loading self.results.losses = (self.results.Sf.real + self.results.St.real) self.results.tap_angle = res.tap_phase self.results.tap_module = res.tap_module self.results.hvdc_Pf = res.hvdc_Pf self.results.hvdc_loading = res.hvdc_loading self.results.vsc_Pf = res.vsc_Pf * Sbase self.results.vsc_loading = res.vsc_loading self.results.converged = res.converged self.results.error = res.error self.results.non_linear = True msg = "Interior point solver" self.logger.add_info(msg=msg, device="Error", value=res.error, expected_value=self.options.ips_tolerance) self.logger.add_info(msg=msg, device="Converged", value=res.converged) self.logger.add_info(msg=msg, device="Iterations", value=res.iterations) else: self.logger.add_error('Solver not supported in this mode', str(self.options.solver)) return self.results if not remote: self.report_progress(0.0) self.report_text('Running all in an external solver, this may take a while...') return self.results
[docs] def run(self): """ :return: """ self.tic() if self.engine == EngineType.GSLV: if not gslv.GSLV_AVAILABLE: self.engine = EngineType.VeraGrid self.logger.add_warning('GSLV not available, falling back to VeraGrid') else: if self.options.solver != SolverType.LINEAR_OPF: self.engine = EngineType.VeraGrid self.logger.add_warning('GSLV OPF snapshot only supports LINEAR_OPF, falling back to VeraGrid') else: pass if self.engine == EngineType.VeraGrid: self.opf() elif self.engine == EngineType.NewtonPA: ti = self.time_indices if self.time_indices is not None else 0 use_time_series = self.time_indices is not None if self.options.solver == SolverType.LINEAR_OPF: self.report_text('Running Linear OPF with Newton...') npa_res = newton_pa_linear_opf(circuit=self.grid, opf_options=self.options, pf_opt=PowerFlowOptions(), time_series=use_time_series, time_indices=self.time_indices) self.results.voltage = npa_res.voltage_module[0, :] * np.exp(1j * npa_res.voltage_angle[0, :]) self.results.bus_shadow_prices = npa_res.nodal_shadow_prices[0, :] self.results.load_shedding = npa_res.load_shedding[0, :] self.results.battery_power = npa_res.battery_power[0, :] # self.results.battery_energy = npa_res.battery_energy[0, :] self.results.generator_power = npa_res.generator_power[0, :] self.results.Sf = npa_res.branch_flows[0, :] self.results.St = -npa_res.branch_flows[0, :] self.results.overloads = npa_res.branch_overloads[0, :] self.results.loading = npa_res.branch_loading[0, :] self.results.tap_angle = npa_res.branch_tap_angle[0, :] # self.results.Sbus = npa_res. self.results.hvdc_Pf = npa_res.hvdc_flows[0, :] self.results.hvdc_loading = npa_res.hvdc_loading[0, :] self.results.converged = True elif self.options.solver == SolverType.NONLINEAR_OPF: self.report_text('Running Non-Linear OPF with Newton...') # pack the results npa_res = newton_pa_nonlinear_opf(circuit=self.grid, pf_opt=self.pf_options, opf_opt=self.options, time_series=use_time_series, time_indices=self.time_indices) self.results.voltage = npa_res.voltage[0, :] self.results.Sbus = npa_res.Scalc[0, :] self.results.bus_shadow_prices = npa_res.bus_shadow_prices[0, :] self.results.load_shedding = npa_res.load_shedding[0, :] self.results.battery_power = npa_res.battery_p[0, :] # self.results.battery_energy = npa_res.battery_energy[0, :] self.results.generator_power = npa_res.generator_p[0, :] self.results.Sf = npa_res.Sf[0, :] self.results.St = npa_res.St[0, :] self.results.overloads = npa_res.branch_overload[0, :] self.results.loading = npa_res.Loading[0, :] # self.results.losses = self.results.tap_angle = npa_res.tap_angle[0, :] self.results.hvdc_Pf = npa_res.hvdc_Pf[0, :] self.results.hvdc_loading = npa_res.hvdc_loading[0, :] self.results.converged = npa_res.converged else: raise Exception(f"{self.options.solver} Not implemented yet") elif self.engine == EngineType.GSLV: self.report_text('Running Linear OPF with GSLV...') gslv_res = gslv.gslv_opf(circuit=self.grid, opf_options=self.options, time_series=False, time_indices=None, logger=self.logger) self.results.voltage = gslv_res.voltage[0, :] self.results.Sbus = gslv_res.Sbus[0, :].real self.results.bus_shadow_prices = gslv_res.bus_shadow_prices[0, :] self.results.load_shedding = gslv_res.load_shedding[0, :] self.results.battery_power = gslv_res.battery_power[0, :] self.results.generator_power = gslv_res.generator_power[0, :] self.results.Sf = gslv_res.Sf[0, :].real self.results.St = gslv_res.St[0, :].real self.results.overloads = gslv_res.overloads[0, :].real self.results.loading = gslv_res.loading[0, :].real self.results.tap_angle = gslv_res.tap_angle[0, :] self.results.hvdc_Pf = gslv_res.hvdc_Pf[0, :] self.results.hvdc_loading = gslv_res.hvdc_loading[0, :] self.results.fluid_node_current_level = gslv_res.fluid_node_current_level[0, :] self.results.fluid_node_flow_in = gslv_res.fluid_node_flow_in[0, :] self.results.fluid_node_flow_out = gslv_res.fluid_node_flow_out[0, :] self.results.fluid_node_p2x_flow = gslv_res.fluid_node_p2x_flow[0, :] self.results.fluid_node_spillage = gslv_res.fluid_node_spillage[0, :] self.results.fluid_path_flow = gslv_res.fluid_path_flow[0, :] self.results.fluid_injection_flow = gslv_res.fluid_injection_flow[0, :] else: self.opf() self.toc() if self.options.generate_report: self.add_report()