Source code for VeraGridEngine.Simulations.OPF.ac_opf_worker

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import numpy as np
from typing import Union, Tuple, Sequence

from VeraGridEngine import ShuntControlMode
from VeraGridEngine.Utils.NumericalMethods.ips import interior_point_solver
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.Simulations.PowerFlow.power_flow_worker import multi_island_pf_nc
from VeraGridEngine.Simulations.PowerFlow.power_flow_options import PowerFlowOptions
from VeraGridEngine.Simulations.OPF.opf_options import OptimalPowerFlowOptions
from VeraGridEngine.Simulations.OPF.Formulations.ac_opf_problem import NonLinearOptimalPfProblem, NonlinearOPFResults
from VeraGridEngine.Simulations.OPF.NumericalMethods.newton_raphson_ips_fx import interior_point_solver
from VeraGridEngine.basic_structures import CxVec, IntVec, Logger


[docs] def remap_original_bus_indices(n_bus, original_bus_idx: Sequence[int]) -> Tuple[IntVec, IntVec]: """ Get arrays of bus mappings :param n_bus: number of buses :param original_bus_idx: array of bus indices in the multi-island scheme :return: original_indices: array of bus indices in the multi-island that apply for this island, island_indices: array of island indices that apply for this island """ original_idx = np.arange(n_bus, dtype=int) mapping = {o: i for i, o in enumerate(original_idx)} island_indices = list() original_indices = list() for a, o in enumerate(original_bus_idx): i = mapping.get(o, None) if i is not None: island_indices.append(i) original_indices.append(a) return np.array(original_indices, dtype=int), np.array(island_indices, dtype=int)
[docs] def run_nonlinear_opf(grid: MultiCircuit, opf_options: OptimalPowerFlowOptions, t_idx: Union[None, int] = None, plot_error: bool = False, optimize_nodal_capacity: bool = False, nodal_capacity_sign: float = 1.0, capacity_nodes_idx: Union[IntVec, None] = None, logger: Logger = Logger()) -> NonlinearOPFResults: """ Run optimal power flow for a MultiCircuit :param grid: MultiCircuit :param opf_options: OptimalPowerFlowOptions :param t_idx: Time index :param plot_error: Plot the error evolution :param optimize_nodal_capacity: :param nodal_capacity_sign: :param capacity_nodes_idx: :param logger: Logger object :return: NonlinearOPFResults """ # compile the system nc = compile_numerical_circuit_at(circuit=grid, t_idx=t_idx, logger=logger) if opf_options.ips_init_with_pf and opf_options.acopf_S0 is not None and opf_options.acopf_v0 is not None: # pick the passed values Sbus_pf = opf_options.acopf_S0 voltage_pf = opf_options.acopf_v0 else: # pick the default values Sbus_pf = nc.bus_data.installed_power voltage_pf = nc.bus_data.Vbus logger.add_error("Initialized with PF, but no PF values were passed") # split into islands, but considering the HVDC lines as actual links islands = nc.split_into_islands(ignore_single_node_islands=True, consider_hvdc_as_island_links=True) # create and initialize results results = NonlinearOPFResults() results.initialize(nbus=nc.nbus, nbr=nc.nbr, nsh=nc.nshunt, ng=nc.ngen, nil=len(nc.passive_branch_data.get_monitor_enabled_indices()), nhvdc=nc.nhvdc, ncap=len(capacity_nodes_idx) if capacity_nodes_idx is not None else 0, nvsc=nc.vsc_data.nelm) for i, island in enumerate(islands): if capacity_nodes_idx is not None: # get the (capacity_nodes_idx_org, capacity_nodes_idx_isl) = remap_original_bus_indices(n_bus=nc.nbus, original_bus_idx=capacity_nodes_idx) else: capacity_nodes_idx_org = None capacity_nodes_idx_isl = None problem = NonLinearOptimalPfProblem(nc=island, options=opf_options, pf_init=opf_options.ips_init_with_pf, Sbus_pf=Sbus_pf[island.bus_data.original_idx], voltage_pf=voltage_pf[island.bus_data.original_idx], optimize_nodal_capacity=optimize_nodal_capacity, nodal_capacity_sign=nodal_capacity_sign, capacity_nodes_idx=capacity_nodes_idx_isl, logger=logger ) # the solver changes the internal state of the problem ips_results = interior_point_solver(problem=problem, max_iter=opf_options.ips_iterations, tol=opf_options.ips_tolerance, pf_init=opf_options.ips_init_with_pf, trust=opf_options.ips_trust_radius, verbose=opf_options.verbose, step_control=False) # once solved, we just gather the internal state island_res = problem.get_solution(ips_results=ips_results, verbose=opf_options.verbose, plot_error=plot_error) results.merge(other=island_res, bus_idx=island.bus_data.original_idx, br_idx=island.passive_branch_data.original_idx, il_idx=island.passive_branch_data.get_monitor_enabled_indices(), gen_idx=island.generator_data.original_idx, hvdc_idx=island.hvdc_data.original_idx, ncap_idx=capacity_nodes_idx_org, contshunt_idx=np.where(island.shunt_data.control_mode_int == ShuntControlMode.Continuous.idx())[0], acopf_mode=opf_options.acopf_mode, vsc_idx=island.vsc_data.original_idx) if i > 0: results.error = max(results.error, island_res.error) results.iterations = max(results.iterations, island_res.iterations) results.converged = results.converged and island_res.converged if i > 0 else island_res.converged else: results.error = island_res.error results.iterations = island_res.iterations results.converged = island_res.converged # expand voltages if there was a bus topology reduction if nc.topology_performed: results.Va = nc.propagate_bus_result(results.Va) results.Vm = nc.propagate_bus_result(results.Vm) results.voltage = nc.propagate_bus_result(results.voltage) return results