Source code for VeraGridEngine.Simulations.OPF.Formulations.linear_opf_ts_b
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
"""
This file implements a DC-OPF for time series
That means that solves the OPF problem for a complete time series at once
"""
from __future__ import annotations
import os
import numpy as np
from typing import List, Union, Tuple, Callable, Dict
from scipy.sparse import csc_matrix
from VeraGridEngine.IO.file_system import opf_file_path
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Devices.Aggregation.inter_aggregation_info import InterAggregationInfo
from VeraGridEngine.Devices.Events.contingency_group import ContingencyGroup
from VeraGridEngine.Devices.Substation.bus import Bus
from VeraGridEngine.Devices.Branches.dc_line import DcLine
from VeraGridEngine.Devices.Fluid.fluid_node import FluidNode
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.DataStructures.numerical_circuit import NumericalCircuit
# from VeraGridEngine.DataStructures.generator_data import GeneratorData
# from VeraGridEngine.DataStructures.battery_data import BatteryData
# from VeraGridEngine.DataStructures.load_data import LoadData
# from VeraGridEngine.DataStructures.passive_branch_data import PassiveBranchData
# from VeraGridEngine.DataStructures.active_branch_data import ActiveBranchData
# from VeraGridEngine.DataStructures.hvdc_data import HvdcData
# from VeraGridEngine.DataStructures.vsc_data import VscData
# from VeraGridEngine.DataStructures.bus_data import BusData
# from VeraGridEngine.DataStructures.fluid_node_data import FluidNodeData
# from VeraGridEngine.DataStructures.fluid_path_data import FluidPathData
# from VeraGridEngine.DataStructures.fluid_turbine_data import FluidTurbineData
# from VeraGridEngine.DataStructures.fluid_pump_data import FluidPumpData
# from VeraGridEngine.DataStructures.fluid_p2x_data import FluidP2XData
from VeraGridEngine.basic_structures import Logger, Vec, IntVec, DateVec, Mat
from VeraGridEngine.Utils.MIP.selected_interface import LpExp, LpVar, LpModel, lpDot, join, get_model_instance
from VeraGridEngine.enumerations import (HvdcControlType, ZonalGrouping, MIPSolvers, TapPhaseControl,
ConverterControlType, MIPFramework)
from VeraGridEngine.Simulations.LinearFactors.linear_analysis import (LinearAnalysis, LinearMultiContingency,
LinearMultiContingencies)
[docs]
def get_contingency_flow_with_filter(multi_contingency: LinearMultiContingency,
base_flow: Vec,
injections: Union[None, Vec],
threshold: float,
m: int) -> LpExp:
"""
Get contingency flow
:param multi_contingency: MultiContingency object
:param base_flow: Base branch flows (nbranch)
:param injections: Bus injections increments (nbus)
:param threshold: threshold to filter contingency elements
:param m: branch monitor index (int)
:return: New flows (nbranch)
"""
res = base_flow[m] + 0
if len(multi_contingency.branch_indices):
for i, c in enumerate(multi_contingency.branch_indices):
if abs(multi_contingency.mlodf_factors[m, i]) >= threshold:
res += multi_contingency.mlodf_factors[m, i] * base_flow[c]
if len(multi_contingency.bus_indices):
for i, c in enumerate(multi_contingency.bus_indices):
if abs(multi_contingency.compensated_ptdf_factors[m, i]) >= threshold:
res += multi_contingency.compensated_ptdf_factors[m, i] * multi_contingency.injections_factor[i] * \
injections[c]
return res
[docs]
class BusVars:
"""
Struct to store the bus related vars
"""
def __init__(self, nt: int, n_elm: int):
"""
BusVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.Va = np.zeros((nt, n_elm), dtype=object)
self.Vm = np.ones((nt, n_elm), dtype=object)
self.Pinj = np.zeros((nt, n_elm), dtype=object)
self.Pgen = np.zeros((nt, n_elm), dtype=object)
self.Pbalance = np.zeros((nt, n_elm), dtype=object)
self.kirchhoff = np.zeros((nt, n_elm), dtype=object)
self.shadow_prices = np.zeros((nt, n_elm), dtype=float)
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "BusVars":
"""
Return an instance of this class where the array's content
is not LP vars but their value
:return: BusVars
"""
nt, n_elm = self.Va.shape
data = BusVars(nt=nt, n_elm=n_elm)
data.shadow_prices = self.shadow_prices
for t in range(nt):
for i in range(n_elm):
data.Va[t, i] = model.get_value(self.Va[t, i])
data.Vm[t, i] = model.get_value(self.Vm[t, i])
data.Pinj[t, i] = model.get_value(self.Pinj[t, i])
data.Pgen[t, i] = model.get_value(self.Pgen[t, i])
data.Pbalance[t, i] = model.get_value(self.Pbalance[t, i]) * Sbase
data.shadow_prices[t, i] = model.get_dual_value(self.kirchhoff[t, i])
# format the arrays appropriately
data.Va = data.Va.astype(float, copy=False)
data.Vm = data.Vm.astype(float, copy=False)
data.Pinj = data.Pinj.astype(float, copy=False)
data.Pbalance = data.Pbalance.astype(float, copy=False)
return data
[docs]
class NodalCapacityVars:
"""
Struct to store the nodal capacity related vars
"""
def __init__(self, nt: int, n_elm: int):
"""
BusVars structure
:param nt: Number of time steps
:param n_elm: Number of buses to optimize the capacity for
"""
self.P = np.zeros((nt, n_elm), dtype=object) # in per-unit power
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "NodalCapacityVars":
"""
Return an instance of this class where the array's
content is not LP vars but their value
:return: BusVars
"""
nt, n_elm = self.P.shape
data = NodalCapacityVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.P[t, i] = model.get_value(self.P[t, i]) * Sbase
# format the arrays appropriately
data.P = data.P.astype(float, copy=False)
return data
[docs]
class LoadVars:
"""
Struct to store the load related vars
"""
def __init__(self, nt: int, n_elm: int):
"""
LoadVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.shedding = np.zeros((nt, n_elm), dtype=object)
self.p = np.zeros((nt, n_elm), dtype=object) # to be filled (no vars)
self.shedding_cost = np.zeros((nt, n_elm), dtype=object)
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "LoadVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: LoadVars
"""
nt, n_elm = self.shedding.shape
data = LoadVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.shedding[t, i] = model.get_value(self.shedding[t, i]) * Sbase
data.p[t, i] = model.get_value(self.p[t, i]) * Sbase
data.shedding_cost[t, i] = model.get_value(self.shedding_cost[t, i]) * Sbase
# format the arrays appropriately
data.shedding = data.shedding.astype(float, copy=False)
data.p = data.p.astype(float, copy=False)
data.shedding_cost = data.shedding_cost.astype(float, copy=False)
return data
[docs]
class GenerationVars:
"""
Struct to store the generation vars
"""
def __init__(self, nt: int, n_elm: int):
"""
GenerationVars structure
:param nt: Number of time steps
:param n_elm: Number of generators
"""
self.p = np.zeros((nt, n_elm), dtype=object)
self.shedding = np.zeros((nt, n_elm), dtype=object)
self.producing = np.zeros((nt, n_elm), dtype=object)
self.starting_up = np.zeros((nt, n_elm), dtype=object)
self.shutting_down = np.zeros((nt, n_elm), dtype=object)
self.cost = np.zeros((nt, n_elm), dtype=object)
self.invested = np.zeros((nt, n_elm), dtype=object)
[docs]
def get_values(self,
Sbase: float,
model: LpModel,
gen_emissions_rates_matrix: csc_matrix,
gen_fuel_rates_matrix: csc_matrix) -> "GenerationVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:param Sbase: Base power (100 MVA)
:param model: LpModel
:param gen_emissions_rates_matrix: emissins rates matrix (n_emissions, n_gen)
:param gen_fuel_rates_matrix: fuel rates matrix (n_fuels, n_gen)
:return: GenerationVars
"""
nt, n_elm = self.p.shape
data = GenerationVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.p[t, i] = model.get_value(self.p[t, i]) * Sbase
data.shedding[t, i] = model.get_value(self.shedding[t, i]) * Sbase
data.producing[t, i] = model.get_value(self.producing[t, i])
data.starting_up[t, i] = model.get_value(self.starting_up[t, i])
data.shutting_down[t, i] = model.get_value(self.shutting_down[t, i])
data.cost[t, i] = model.get_value(self.cost[t, i]) * Sbase
data.invested[t, i] = model.get_value(self.invested[t, i])
# format the arrays appropriately
data.p = data.p.astype(float, copy=False)
data.shedding = data.shedding.astype(float, copy=False)
data.producing = data.producing.astype(bool, copy=False)
data.starting_up = data.starting_up.astype(bool, copy=False)
data.shutting_down = data.shutting_down.astype(bool, copy=False)
data.cost = data.cost.astype(float, copy=False)
data.invested = data.invested.astype(bool, copy=False)
return data
[docs]
class BatteryVars(GenerationVars):
"""
struct extending the generation vars to handle the battery vars
"""
def __init__(self, nt: int, n_elm: int):
"""
BatteryVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
GenerationVars.__init__(self, nt=nt, n_elm=n_elm)
self.e = np.zeros((nt, n_elm), dtype=object)
[docs]
def get_values(self, Sbase: float, model: LpModel,
gen_emissions_rates_matrix: csc_matrix = None, # not needed but included for compatibiliy
gen_fuel_rates_matrix: csc_matrix = None # not needed but included for compatibiliy
) -> "BatteryVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: GenerationVars
"""
nt, n_elm = self.p.shape
data = BatteryVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.p[t, i] = model.get_value(self.p[t, i]) * Sbase
data.e[t, i] = model.get_value(self.e[t, i]) * Sbase
data.shedding[t, i] = model.get_value(self.shedding[t, i]) * Sbase
data.producing[t, i] = model.get_value(self.producing[t, i])
data.starting_up[t, i] = model.get_value(self.starting_up[t, i])
data.shutting_down[t, i] = model.get_value(self.shutting_down[t, i])
data.cost[t, i] = model.get_value(self.cost[t, i])
data.invested[t, i] = model.get_value(self.invested[t, i])
# format the arrays appropriately
data.p = data.p.astype(float, copy=False)
data.e = data.e.astype(float, copy=False)
data.shedding = data.shedding.astype(float, copy=False)
data.producing = data.producing.astype(int, copy=False)
data.starting_up = data.starting_up.astype(int, copy=False)
data.shutting_down = data.shutting_down.astype(int, copy=False)
data.cost = data.cost.astype(float, copy=False)
data.invested = data.invested.astype(bool, copy=False)
return data
[docs]
class BranchVars:
"""
Struct to store the branch related vars
"""
def __init__(self, nt: int, n_elm: int):
"""
BranchVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.flows = np.zeros((nt, n_elm), dtype=object)
self.z_flows = np.zeros((nt, n_elm), dtype=object) # absolute used with add_losses_approximation
self.losses = np.zeros((nt, n_elm), dtype=object) # absolute used with add_losses_approximation
self.flow_slacks_pos = np.zeros((nt, n_elm), dtype=object)
self.flow_slacks_neg = np.zeros((nt, n_elm), dtype=object)
self.tap_angles = np.zeros((nt, n_elm), dtype=object)
self.flow_constraints_ub = np.zeros((nt, n_elm), dtype=object)
self.flow_constraints_lb = np.zeros((nt, n_elm), dtype=object)
self.overload_cost = np.zeros((nt, n_elm), dtype=object)
self.rates = np.zeros((nt, n_elm), dtype=float)
self.loading = np.zeros((nt, n_elm), dtype=float)
# t, m, c, contingency, negative_slack, positive_slack
self.contingency_flow_data: List[Tuple[int, int, int, Union[float, LpVar, LpExp], LpVar, LpVar]] = list()
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "BranchVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: BranchVars
"""
nt, n_elm = self.flows.shape
data = BranchVars(nt=nt, n_elm=n_elm)
data.rates = self.rates
for t in range(nt):
for i in range(n_elm):
data.flows[t, i] = model.get_value(self.flows[t, i]) * Sbase
data.flow_slacks_pos[t, i] = model.get_value(self.flow_slacks_pos[t, i]) * Sbase
data.flow_slacks_neg[t, i] = model.get_value(self.flow_slacks_neg[t, i]) * Sbase
data.tap_angles[t, i] = model.get_value(self.tap_angles[t, i])
data.flow_constraints_ub[t, i] = model.get_value(self.flow_constraints_ub[t, i])
data.flow_constraints_lb[t, i] = model.get_value(self.flow_constraints_lb[t, i])
data.overload_cost[t, i] = model.get_value(self.overload_cost[t, i]) * Sbase
data.losses[t, i] = model.get_value(self.losses[t, i]) * Sbase
for i in range(len(self.contingency_flow_data)):
t, m, c, var, neg_slack, pos_slack = self.contingency_flow_data[i]
self.contingency_flow_data[i] = (t, m, c,
model.get_value(var),
model.get_value(neg_slack),
model.get_value(pos_slack))
# format the arrays appropriately
data.flows = data.flows.astype(float, copy=False)
data.flow_slacks_pos = data.flow_slacks_pos.astype(float, copy=False)
data.flow_slacks_neg = data.flow_slacks_neg.astype(float, copy=False)
data.overload_cost = data.overload_cost.astype(float, copy=False)
data.tap_angles = data.tap_angles.astype(float, copy=False)
# compute loading
data.loading = data.flows / (data.rates + 1e-20)
return data
[docs]
def add_contingency_flow(self, t: int, m: int, c: int,
flow_var: Union[float, LpVar, LpExp],
neg_slack: LpVar,
pos_slack: LpVar):
"""
Add contingency flow
:param t: time index
:param m: monitored index
:param c: contingency group index
:param flow_var: flow var
:param neg_slack: negative flow slack variable
:param pos_slack: positive flow slack variable
"""
self.contingency_flow_data.append((t, m, c, flow_var, neg_slack, pos_slack))
[docs]
class HvdcVars:
"""
Struct to store the generation vars
"""
def __init__(self, nt: int, n_elm: int):
"""
GenerationVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.flows = np.zeros((nt, n_elm), dtype=object)
self.rates = np.zeros((nt, n_elm), dtype=float)
self.loading = np.zeros((nt, n_elm), dtype=float)
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "HvdcVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: HvdcVars
"""
nt, n_elm = self.flows.shape
data = HvdcVars(nt=nt, n_elm=n_elm)
data.rates = self.rates
for t in range(nt):
for i in range(n_elm):
data.flows[t, i] = model.get_value(self.flows[t, i]) * Sbase
# format the arrays appropriately
data.flows = data.flows.astype(float, copy=False)
data.loading = data.flows / (data.rates + 1e-20)
return data
[docs]
class VscVars:
"""
Struct to store the generation vars
"""
def __init__(self, nt: int, n_elm: int):
"""
GenerationVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.flows = np.zeros((nt, n_elm), dtype=object)
self.rates = np.zeros((nt, n_elm), dtype=float)
self.loading = np.zeros((nt, n_elm), dtype=float)
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "HvdcVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: HvdcVars
"""
nt, n_elm = self.flows.shape
data = HvdcVars(nt=nt, n_elm=n_elm)
data.rates = self.rates
for t in range(nt):
for i in range(n_elm):
data.flows[t, i] = model.get_value(self.flows[t, i]) * Sbase
# format the arrays appropriately
data.flows = data.flows.astype(float, copy=False)
data.loading = data.flows / (data.rates + 1e-20)
return data
[docs]
class FluidNodeVars:
"""
Struct to store the vars of nodes of fluid type
"""
def __init__(self, nt: int, n_elm: int):
"""
FluidNodeVars structure
:param nt: Number of time steps
:param n_elm: Number of nodes
"""
# the objects below are extracted from data
# self.min_level = np.zeros((nt, n_elm), dtype=float) # m3
# self.max_level = np.zeros((nt, n_elm), dtype=float) # m3
# self.initial_level = np.zeros((nt, n_elm), dtype=float) # m3
self.p2x_flow = np.zeros((nt, n_elm), dtype=object) # m3
self.current_level = np.zeros((nt, n_elm), dtype=object) # m3
self.spillage = np.zeros((nt, n_elm), dtype=object) # m3/s
self.flow_in = np.zeros((nt, n_elm), dtype=object) # m3/s
self.flow_out = np.zeros((nt, n_elm), dtype=object) # m3/s
[docs]
def get_values(self, model: LpModel) -> "FluidNodeVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:param model: LP model from where we extract the values
:return: FluidNodeVars
"""
nt, n_elm = self.p2x_flow.shape
data = FluidNodeVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.p2x_flow[t, i] = model.get_value(self.p2x_flow[t, i])
data.current_level[t, i] = model.get_value(self.current_level[t, i])
data.spillage[t, i] = model.get_value(self.spillage[t, i])
data.flow_in[t, i] = model.get_value(self.flow_in[t, i])
data.flow_out[t, i] = model.get_value(self.flow_out[t, i])
# format the arrays appropriately
data.p2x_flow = data.p2x_flow.astype(float, copy=False)
data.current_level = data.current_level.astype(float, copy=False)
data.spillage = data.spillage.astype(float, copy=False)
data.flow_in = data.flow_in.astype(float, copy=False)
data.flow_out = data.flow_out.astype(float, copy=False)
# from the data object itself
# data.min_level = self.min_level
# data.max_level = self.max_level
# data.initial_level = self.initial_level
return data
[docs]
class FluidPathVars:
"""
Struct to store the vars of paths of fluid type
"""
def __init__(self, nt: int, n_elm: int):
"""
FluidPathVars structure
:param nt: Number of time steps
:param n_elm: Number of paths (rivers)
"""
# from the data object
# self.min_flow = np.zeros((nt, n_elm), dtype=float) # m3/s
# self.max_flow = np.zeros((nt, n_elm), dtype=float) # m3/s
self.flow = np.zeros((nt, n_elm), dtype=object) # m3/s
[docs]
def get_values(self, model: LpModel) -> "FluidPathVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:param model: LP model from where we extract the values
:return: FluidPathVars
"""
nt, n_elm = self.flow.shape
data = FluidPathVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.flow[t, i] = model.get_value(self.flow[t, i])
# format the arrays appropriately
data.flow = data.flow.astype(float, copy=False)
# data.min_flow = self.min_flow
# data.max_flow = self.max_flow
return data
[docs]
class FluidInjectionVars:
"""
Struct to store the vars of injections of fluid type
"""
def __init__(self, nt: int, n_elm: int):
"""
FluidInjectionVars structure
:param nt: Number of time steps
:param n_elm: Number of elements moving fluid
"""
self.flow = np.zeros((nt, n_elm), dtype=object) # m3/s
[docs]
def get_values(self, model: LpModel) -> "FluidInjectionVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:param model: LP model from where we extract the values
:return: FluidInjectionVars
"""
nt, n_elm = self.flow.shape
data = FluidInjectionVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.flow[t, i] = model.get_value(self.flow[t, i])
# format the arrays appropriately
data.flow = data.flow.astype(float, copy=False)
return data
[docs]
class SystemVars:
"""
Struct to store the system vars
"""
def __init__(self, nt: int):
"""
SystemVars structure
:param nt: Number of time steps
"""
self.system_fuel = np.zeros(nt, dtype=float)
self.system_emissions = np.zeros(nt, dtype=float)
self.system_unit_energy_cost = np.zeros(nt, dtype=float)
self.system_total_energy_cost = np.zeros(nt, dtype=float)
self.power_by_technology = np.zeros(nt, dtype=float)
[docs]
def compute(self,
gen_emissions_rates_matrix: csc_matrix,
gen_fuel_rates_matrix: csc_matrix,
gen_tech_shares_matrix: csc_matrix,
batt_tech_shares_matrix: csc_matrix,
gen_p: Mat,
gen_cost: Mat,
batt_p: Mat,
shedding_cost: Mat):
"""
Compute the system values
:param gen_emissions_rates_matrix: emissions rates matrix (n_emissions, n_gen)
:param gen_fuel_rates_matrix: fuel rates matrix (n_fuels, n_gen)
:param gen_tech_shares_matrix: technology shares of the generators
:param batt_tech_shares_matrix technology shares of the batteries
:param gen_p: Generation power values (nt, ngen)
:param gen_cost: Generation cost values (nt, ngen)
:param batt_p: Battery power values (nt, nbatt)
:param shedding_cost: Shedding cost values (nt, ngen)
"""
self.system_fuel = (gen_fuel_rates_matrix * gen_p.T).T
self.system_emissions = (gen_emissions_rates_matrix * gen_p.T).T
self.power_by_technology = (gen_tech_shares_matrix * gen_p.T).T
self.power_by_technology += (batt_tech_shares_matrix * batt_p.T).T
with np.errstate(divide='ignore', invalid='ignore'): # numpy magic to ignore the zero divisions
self.system_total_energy_cost = np.nan_to_num(gen_cost).sum(axis=1)
self.system_total_energy_cost += np.nan_to_num(shedding_cost).sum(axis=1)
self.system_unit_energy_cost = self.system_total_energy_cost / np.nan_to_num(gen_p).sum(axis=1)
return self
[docs]
class OpfVars:
"""
Structure to host the opf variables
"""
def __init__(self, nt: int, nbus: int, ng: int, nb: int, nl: int, nbr: int, n_hvdc: int, n_vsc: int,
n_fluid_node: int,
n_fluid_path: int, n_fluid_inj: int, n_cap_buses: int):
"""
Constructor
:param nt: number of time steps
:param nbus: number of nodes
:param ng: number of generators
:param nb: number of batteries
:param nl: number of loads
:param nbr: number of branches
:param n_hvdc: number of HVDC
:param n_fluid_node: number of fluid nodes
:param n_fluid_path: number of fluid paths
:param n_fluid_inj: number of fluid injections
"""
self.nt = nt
self.nbus = nbus
self.ng = ng
self.nb = nb
self.nl = nl
self.nbr = nbr
self.n_hvdc = n_hvdc
self.n_vsc = n_vsc
self.n_fluid_node = n_fluid_node
self.n_fluid_path = n_fluid_path
self.n_fluid_inj = n_fluid_inj
self.n_cap_buses = n_cap_buses
self.acceptable_solution = False
self.bus_vars = BusVars(nt=nt, n_elm=nbus)
self.nodal_capacity_vars = NodalCapacityVars(nt=nt, n_elm=n_cap_buses)
self.load_vars = LoadVars(nt=nt, n_elm=nl)
self.gen_vars = GenerationVars(nt=nt, n_elm=ng)
self.batt_vars = BatteryVars(nt=nt, n_elm=nb)
self.branch_vars = BranchVars(nt=nt, n_elm=nbr)
self.hvdc_vars = HvdcVars(nt=nt, n_elm=n_hvdc)
self.vsc_vars = VscVars(nt=nt, n_elm=n_vsc)
self.fluid_node_vars = FluidNodeVars(nt=nt, n_elm=n_fluid_node)
self.fluid_path_vars = FluidPathVars(nt=nt, n_elm=n_fluid_path)
self.fluid_inject_vars = FluidInjectionVars(nt=nt, n_elm=n_fluid_inj)
self.sys_vars = SystemVars(nt=nt)
[docs]
def get_values(self, Sbase: float, model: LpModel,
gen_emissions_rates_matrix: csc_matrix,
gen_fuel_rates_matrix: csc_matrix,
gen_tech_shares_matrix: csc_matrix,
batt_tech_shares_matrix: csc_matrix) -> "OpfVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: OpfVars instance
"""
data = OpfVars(nt=self.nt,
nbus=self.nbus,
ng=self.ng,
nb=self.nb,
nl=self.nl,
nbr=self.nbr,
n_hvdc=self.n_hvdc,
n_vsc=self.n_vsc,
n_fluid_node=self.n_fluid_node,
n_fluid_path=self.n_fluid_path,
n_fluid_inj=self.n_fluid_inj,
n_cap_buses=self.n_cap_buses)
data.bus_vars = self.bus_vars.get_values(Sbase, model)
data.nodal_capacity_vars = self.nodal_capacity_vars.get_values(Sbase, model)
data.load_vars = self.load_vars.get_values(Sbase, model)
data.gen_vars = self.gen_vars.get_values(Sbase=Sbase,
model=model,
gen_emissions_rates_matrix=gen_emissions_rates_matrix,
gen_fuel_rates_matrix=gen_fuel_rates_matrix)
data.batt_vars = self.batt_vars.get_values(Sbase, model)
data.branch_vars = self.branch_vars.get_values(Sbase, model)
data.hvdc_vars = self.hvdc_vars.get_values(Sbase, model)
data.vsc_vars = self.vsc_vars.get_values(Sbase, model)
data.fluid_node_vars = self.fluid_node_vars.get_values(model)
data.fluid_path_vars = self.fluid_path_vars.get_values(model)
data.fluid_inject_vars = self.fluid_inject_vars.get_values(model)
data.sys_vars = self.sys_vars.compute(gen_emissions_rates_matrix=gen_emissions_rates_matrix,
gen_fuel_rates_matrix=gen_fuel_rates_matrix,
gen_tech_shares_matrix=gen_tech_shares_matrix,
batt_tech_shares_matrix=batt_tech_shares_matrix,
gen_p=data.gen_vars.p,
batt_p=data.batt_vars.p,
gen_cost=data.gen_vars.cost,
shedding_cost=data.load_vars.shedding_cost)
data.acceptable_solution = self.acceptable_solution
return data
[docs]
def add_linear_generation_formulation(local_t: int,
global_t: int | None,
grid: MultiCircuit,
bus_idx_dict: Dict[Bus, int],
Sbase: float,
time_array: DateVec,
bus_vars: BusVars,
gen_vars: GenerationVars,
prob: LpModel,
unit_commitment: bool,
ramp_constraints: bool,
skip_generation_limits: bool,
all_generators_fixed: bool,
vd: IntVec,
nodal_capacity_active: bool,
generation_expansion_planning: bool,
use_glsk_as_cost: bool,
logger: Logger):
"""
Add MIP generation formulation
:param local_t: time step (possibly reduced or from an interval)
:param global_t: global time (integer or None to signal for the snapshot)
:param grid: MultiCircuit instance
:param bus_idx_dict: Bus-index dictionary
:param Sbase: base power (100 MVA)
:param time_array: complete time array
:param bus_vars: BusVars
:param gen_vars: GenerationVars structure
:param prob: LpModel
:param unit_commitment: formulate unit commitment?
:param ramp_constraints: formulate ramp constraints?
:param skip_generation_limits: skip the generation limits?
:param all_generators_fixed: All generators take their snapshot or profile values
instead of resorting to dispatchable status
:param vd: slack indices
:param nodal_capacity_active: nodal capacity active?
:param generation_expansion_planning: generation expansion plan?
:param use_glsk_as_cost: if true, the GLSK values are used instead of the traditional costs
:param logger: Logger instance
:return objective function
"""
f_obj = 0.0
if nodal_capacity_active:
# TODO: This looks like a bug
# id_gen_nonvd = [i for i in range(gen_data_t.C_bus_elm.shape[1]) if i not in vd]
id_gen_nonvd = [i for i in range(grid.get_generators_number()) if i not in vd]
else:
id_gen_nonvd = []
if time_array is not None:
if len(time_array) > 0:
year = time_array[local_t].year - time_array[0].year
else:
year = 0
else:
year = 0
# add generation stuff
for k, elm in enumerate(grid.generators):
bus_idx = bus_idx_dict.get(elm.bus, None)
active = elm.get_active_at(global_t)
if active and bus_idx is not None:
pmax_pu = elm.get_Pmax_at(global_t) / Sbase
pmin_pu = elm.get_Pmin_at(global_t) / Sbase
cost0_pu = elm.get_Cost0_at(global_t) / Sbase
cost1_pu = elm.get_Cost_at(global_t) / Sbase
gen_vars.cost[local_t, k] = 0.0
# TODO Review and change the id_gen_nonvd stuff
if active and k not in id_gen_nonvd and bus_idx > -1:
if elm.get_enabled_dispatch_at(global_t) and not all_generators_fixed:
# declare active power var (limits will be applied later)
gen_vars.p[local_t, k] = prob.add_var(
lb=-1e20, ub=1e20, name=join("gen_p_", [local_t, k], "_")
)
if unit_commitment:
# declare unit commitment vars
gen_vars.starting_up[local_t, k] = prob.add_int(
0, 1, join("gen_starting_up_", [local_t, k], "_")
)
gen_vars.producing[local_t, k] = prob.add_int(
0, 1, join("gen_producing_", [local_t, k], "_")
)
gen_vars.shutting_down[local_t, k] = prob.add_int(
0, 1, join("gen_shutting_down_", [local_t, k], "_")
)
# operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += (
elm.get_shift_key_at(global_t) * gen_vars.p[local_t, k]
)
else:
gen_vars.cost[local_t, k] += (
cost1_pu * gen_vars.p[local_t, k]
+ cost0_pu * gen_vars.producing[local_t, k]
)
# start-up cost
gen_vars.cost[local_t, k] += (
elm.startup_cost * gen_vars.starting_up[local_t, k]
)
# power boundaries of the generator
if not skip_generation_limits:
prob.add_cst(
cst=gen_vars.p[local_t, k] >= pmin_pu * gen_vars.producing[local_t, k],
name=join("gen_geq_Pmin", [local_t, k], "_")
)
prob.add_cst(
cst=gen_vars.p[local_t, k] <= pmax_pu * gen_vars.producing[local_t, k],
name=join("gen_leq_Pmax", [local_t, k], "_")
)
if local_t is not None:
if local_t == 0:
prob.add_cst(
cst=gen_vars.starting_up[local_t, k] - gen_vars.shutting_down[local_t, k] ==
gen_vars.producing[local_t, k] - float(active),
name=join("binary_alg1_", [local_t, k], "_")
)
prob.add_cst(
cst=(gen_vars.starting_up[local_t, k] + gen_vars.shutting_down[local_t, k] <= 1),
name=join("binary_alg2_", [local_t, k], "_")
)
else:
prob.add_cst(
cst=(gen_vars.starting_up[local_t, k] - gen_vars.shutting_down[local_t, k] ==
gen_vars.producing[local_t, k] - gen_vars.producing[local_t - 1, k]),
name=join("binary_alg3_", [local_t, k], "_")
)
prob.add_cst(
cst=gen_vars.starting_up[local_t, k] + gen_vars.shutting_down[local_t, k] <= 1,
name=join("binary_alg4_", [local_t, k], "_")
)
else:
# No unit commitment
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += elm.get_shift_key_at(global_t) * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += (cost1_pu * gen_vars.p[local_t, k]) + cost0_pu
if not skip_generation_limits:
prob.set_var_bounds(var=gen_vars.p[local_t, k], lb=pmin_pu, ub=pmax_pu)
# add the ramp constraints
if ramp_constraints and local_t is not None:
if local_t > 0:
pmax = elm.get_Pmax_at(global_t)
# TODO: review units here
if elm.ramp_up < pmax and elm.ramp_down < pmax:
# if the ramp is actually sufficiently restrictive...
dt = (time_array[local_t] -
time_array[local_t - 1]).seconds / 3600.0 # time increment in hours
# - ramp_down Β· dt <= P(t) - P(t-1) <= ramp_up Β· dt
prob.add_cst(
cst=(-elm.ramp_down / Sbase * dt <=
gen_vars.p[local_t, k] - gen_vars.p[local_t - 1, k])
)
prob.add_cst(
cst=gen_vars.p[local_t, k] - gen_vars.p[local_t - 1, k] <= elm.ramp_up / Sbase * dt
)
else:
# it is NOT dispatchable
p_pu = elm.get_P_at(global_t) / Sbase
# the generator is not dispatchable at time step
if p_pu > 0:
gen_vars.shedding[local_t, k] = prob.add_var(
0, p_pu, join("gen_shedding_", [local_t, k], "_")
)
gen_vars.p[local_t, k] = p_pu - gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += elm.get_shift_key_at(global_t) * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += cost1_pu * gen_vars.shedding[local_t, k]
elif p_pu < 0:
# the negative sign is because P is already negative here, to make it positive
gen_vars.shedding[local_t, k] = prob.add_var(
0, -p_pu, join("gen_shedding_", [local_t, k], "_")
)
gen_vars.p[local_t, k] = p_pu + gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += elm.get_shift_key_at(global_t) * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += cost1_pu * gen_vars.shedding[local_t, k]
else:
# the generation value is exactly zero
prob.set_var_bounds(var=gen_vars.p[local_t, k], lb=0.0, ub=0.0)
gen_vars.producing[local_t, k] = 1
gen_vars.shutting_down[local_t, k] = 0
gen_vars.starting_up[local_t, k] = 0
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += (
elm.get_shift_key_at(global_t) * gen_vars.p[local_t, k]
)
else:
gen_vars.cost[local_t, k] += cost1_pu * gen_vars.p[local_t, k] + cost0_pu
# add to the balance
bus_vars.Pbalance[local_t, bus_idx] += gen_vars.p[local_t, k]
# add to the generation injections in case of inter-area
bus_vars.Pgen[local_t, bus_idx] += gen_vars.p[local_t, k]
else:
# the generator is not available at time step
gen_vars.p[local_t, k] = 0.0 # there has not been any variable assigned to p[t, k] at this point
# add to the objective function the total cost of the generator
f_obj += gen_vars.cost[local_t, k]
else:
logger.add_error("Generator without bus", device_class="Generator",
device_property="bus", device=elm.idtag)
if generation_expansion_planning:
# formulate the investment
gen_dict = {elm.idtag: (idx, elm) for idx, elm in enumerate(grid.generators)}
capex = np.zeros(grid.get_generators_number())
discount_rate = np.zeros(grid.get_generators_number())
is_candidate = np.zeros(grid.get_generators_number(), dtype=bool)
for investment in grid.investments:
# search in generators
data = gen_dict.get(investment.device_idtag, None)
if data is not None:
idx, elm = data
if investment.CAPEX != 0.0: # overwrite the base capex
capex[idx] = investment.CAPEX
is_candidate[idx] = True
discount_rate[idx] = investment.group.discount_rate
for k, elm in enumerate(grid.generators):
bus_idx = bus_idx_dict.get(elm.bus, None)
active = elm.get_active_at(global_t)
if active and bus_idx is not None:
pmax_pu = elm.get_Pmax_at(global_t) / Sbase
gen_vars.cost[local_t, k] = 0.0
# get bus index
bus_idx = bus_idx_dict[elm.bus]
# TODO Review and change the id_gen_nonvd stuff
if elm.get_active_at(local_t) and k not in id_gen_nonvd and bus_idx > -1:
if elm.get_enabled_dispatch_at(global_t) and not all_generators_fixed:
# Generation Expansion Planning
if is_candidate[k] and generation_expansion_planning:
money_factor = np.power(1.0 + discount_rate[k] / 100.0, year)
# declare the investment binary
gen_vars.invested[local_t, k] = prob.add_int(
lb=0, ub=1, name=join("Ig_", [local_t, k])
)
# add the investment cost to the objective
f_obj += gen_vars.invested[local_t, k] * pmax_pu * capex[k] * money_factor
if local_t > 0:
# installation persistence
prob.add_cst(
cst=gen_vars.invested[local_t - 1, k] <= gen_vars.invested[local_t, k],
name=join("persist_", [local_t, k])
)
# maximum production constraint
prob.add_cst(
cst=(gen_vars.p[local_t, k] <= pmax_pu * gen_vars.invested[local_t, k]),
name=join("max_prod_", [local_t, k])
)
else:
# is invested for already
gen_vars.invested[local_t, k] = 1
return f_obj
[docs]
def add_linear_battery_formulation(local_t: int,
global_t: int | None,
grid: MultiCircuit,
bus_idx_dict: Dict[Bus, int],
Sbase: float,
time_array: DateVec,
bus_vars: BusVars,
batt_vars: BatteryVars,
prob: LpModel,
unit_commitment: bool,
ramp_constraints: bool,
skip_generation_limits: bool,
generation_expansion_planning: bool,
energy_0: Vec):
"""
Add MIP generation formulation
:param local_t: time step (possibly reduced or from an interval)
:param global_t: global time (integer or None to signal for the snapshot)
:param grid: MultiCircuit instance
:param bus_idx_dict: Bus-index dictionary
:param Sbase: base power (100 MVA)
:param time_array: complete time array
:param bus_vars: BusVars
:param batt_data_t: BatteryData structure
:param batt_vars: BatteryVars structure
:param prob: ORTools problem
:param unit_commitment: formulate unit commitment?
:param ramp_constraints: formulate ramp constraints?
:param skip_generation_limits: skip the generation limits?
:param generation_expansion_planning: generation expansion planning?
:param energy_0: initial value of the energy stored
:return objective function
"""
f_obj = 0.0
for k, elm in enumerate(grid.batteries):
bus_idx = bus_idx_dict.get(elm.bus, None)
active = elm.get_active_at(global_t)
if active and bus_idx is not None:
pmax_pu = elm.get_Pmax_at(global_t) / Sbase
pmin_pu = elm.get_Pmin_at(global_t) / Sbase
cost0_pu = elm.get_Cost0_at(global_t) / Sbase
cost1_pu = elm.get_Cost_at(global_t) / Sbase
# declare active power var (limits will be applied later)
p_pos = prob.add_var(0, 1e20, join("batt_ppos_", [local_t, k], "_"))
p_neg = prob.add_var(0, 1e20, join("batt_pneg_", [local_t, k], "_"))
batt_vars.p[local_t, k] = p_pos - p_neg
if elm.get_enabled_dispatch_at(global_t):
if unit_commitment:
# declare unit commitment vars
batt_vars.starting_up[local_t, k] = prob.add_int(
0, 1, join("bat_starting_up_", [local_t, k], "_")
)
batt_vars.producing[local_t, k] = prob.add_int(
0, 1, join("bat_producing_", [local_t, k], "_")
)
batt_vars.shutting_down[local_t, k] = prob.add_int(
0, 1, join("bat_shutting_down_", [local_t, k], "_")
)
# operational cost (linear...)
f_obj += cost1_pu * p_pos + cost0_pu * batt_vars.producing[local_t, k]
# start-up cost
f_obj += elm.StartupCost * batt_vars.starting_up[local_t, k]
# power boundaries of the generator
if not skip_generation_limits:
prob.add_cst(
cst=(batt_vars.p[local_t, k] >= (pmin_pu * batt_vars.producing[local_t, k])),
name=join("batt_geq_Pmin", [local_t, k], "_"))
prob.add_cst(
cst=(batt_vars.p[local_t, k] <= (pmax_pu * batt_vars.producing[local_t, k])),
name=join("batt_leq_Pmax", [local_t, k], "_"))
if local_t is not None:
if local_t == 0:
prob.add_cst(
cst=(batt_vars.starting_up[local_t, k] - batt_vars.shutting_down[local_t, k] ==
batt_vars.producing[local_t, k] - float(active)),
name=join("binary_bat_alg1_", [local_t, k], "_"))
prob.add_cst(
cst=batt_vars.starting_up[local_t, k] + batt_vars.shutting_down[local_t, k] <= 1,
name=join("binary_bat_alg2_", [local_t, k], "_"))
else:
prob.add_cst(
cst=(batt_vars.starting_up[local_t, k] - batt_vars.shutting_down[local_t, k] ==
batt_vars.producing[local_t, k] - batt_vars.producing[local_t - 1, k]),
name=join("binary_bat_alg3_", [local_t, k], "_"))
prob.add_cst(
cst=batt_vars.starting_up[local_t, k] + batt_vars.shutting_down[local_t, k] <= 1,
name=join("binary_bat_alg4_", [local_t, k], "_"))
else:
# No unit commitment
# Operational cost (linear...)
f_obj += cost1_pu * p_pos + cost0_pu
# power boundaries of the generator
if not skip_generation_limits:
prob.set_var_bounds(var=p_pos, lb=0, ub=pmax_pu)
prob.set_var_bounds(var=p_neg, lb=0, ub=-pmin_pu)
# compute the time increment in hours
if len(time_array) > 1:
dt = (time_array[local_t] - time_array[local_t - 1]).seconds / 3600.0
else:
dt = 1.0
if ramp_constraints and local_t is not None:
if local_t > 0:
# add the ramp constraints
if elm.RampUp < elm.get_Pmax_at(global_t) and elm.RampDown < elm.get_Pmax_at(global_t):
# if the ramp is actually sufficiently restrictive...
# - ramp_down Β· dt <= P(t) - P(t-1) <= ramp_up Β· dt
prob.add_cst(
cst=-elm.RampDown / Sbase * dt <= batt_vars.p[local_t, k] - batt_vars.p[local_t - 1, k]
)
prob.add_cst(
cst=batt_vars.p[local_t, k] - batt_vars.p[local_t - 1, k] <= elm.RampUp / Sbase * dt
)
# set the energy value Et = E(t - 1) + dt * Pb / eff
emin_pu = elm.energy * elm.min_soc / Sbase
emax_pu = elm.energy * elm.max_soc / Sbase
batt_vars.e[local_t, k] = prob.add_var(
lb=emin_pu,
ub=emax_pu,
name=join("batt_e_", [local_t, k], "_")
)
if local_t > 0:
# energy decreases / increases with power Β· dt
prob.add_cst(cst=(batt_vars.e[local_t, k] == batt_vars.e[local_t - 1, k]
+ dt * (elm.discharge_efficiency * p_pos - elm.charge_efficiency * p_neg)),
name=join("batt_energy_", [local_t, k], "_"))
else:
# set the initial energy value
batt_vars.e[local_t, k] = energy_0[k] / Sbase
else:
# it is NOT dispatchable
# Operational cost (linear...)
f_obj += (cost1_pu * p_pos) + cost0_pu
p_pu = elm.get_P_at(global_t) / Sbase
# the generator is not dispatchable at time step
if p_pu > 0:
batt_vars.shedding[local_t, k] = prob.add_var(
0, p_pu, join("bat_shedding_", [local_t, k], "_")
)
prob.add_cst(
cst=batt_vars.p[local_t, k] == p_pu - batt_vars.shedding[local_t, k],
name=join("batt==PB-PBslack", [local_t, k], "_"))
f_obj += cost1_pu * batt_vars.shedding[local_t, k]
elif p_pu < 0:
# the negative sign is because P is already negative here
batt_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=-p_pu, name=join("bat_shedding_", [local_t, k], "_")
)
prob.add_cst(
cst=batt_vars.p[local_t, k] == p_pu + batt_vars.shedding[local_t, k],
name=join("batt==PB+PBslack", [local_t, k], "_"))
f_obj += cost1_pu * batt_vars.shedding[local_t, k]
else:
# the generation value is exactly zero, pass
pass
batt_vars.producing[local_t, k] = 1
batt_vars.shutting_down[local_t, k] = 0
batt_vars.starting_up[local_t, k] = 0
# add to the balance
bus_vars.Pbalance[local_t, bus_idx] += batt_vars.p[local_t, k]
# add to the generation injections in case of inter-area
bus_vars.Pgen[local_t, bus_idx] += batt_vars.p[local_t, k]
else:
# the generator is not available at a time step
batt_vars.p[local_t, k] = 0.0
return f_obj
[docs]
def add_nodal_capacity_formulation(t: Union[int, None],
nodal_capacity_vars: NodalCapacityVars,
nodal_capacity_sign: float,
capacity_nodes_idx: IntVec,
prob: LpModel):
"""
Add MIP generation formulation
:param t: time step, if None we assume single time step
:param nodal_capacity_vars: NodalCapacityVars structure
:param nodal_capacity_sign:
:param capacity_nodes_idx: IntVec
:param prob: ORTools problem
:return objective function
"""
f_obj = 0.0
for k, idx in enumerate(capacity_nodes_idx):
# assign load shedding variable
if nodal_capacity_sign < 0:
nodal_capacity_vars.P[t, k] = prob.add_var(lb=0.0,
ub=9999.9,
name=join("nodal_capacity_", [t, k], "_"))
else:
nodal_capacity_vars.P[t, k] = prob.add_var(lb=-9999.9,
ub=0.0,
name=join("nodal_capacity_", [t, k], "_"))
# maximize the nodal power injection
f_obj += 100 * nodal_capacity_sign * nodal_capacity_vars.P[t, k]
return f_obj
[docs]
def add_linear_load_formulation(local_t: int,
global_t: int | None,
grid: MultiCircuit,
bus_idx_dict: Dict[Bus, int],
Sbase: float,
bus_vars: BusVars,
load_vars: LoadVars,
prob: LpModel):
"""
Add MIP generation formulation
:param local_t: time step (possibly reduced or from an interval)
:param global_t: global time (integer or None to signal for the snapshot)
:param grid: MultiCircuit instance
:param bus_idx_dict: Bus-index dictionary
:param Sbase: base power (100 MVA)
:param bus_vars: BusVars
:param load_data_t: BatteryData structure
:param load_vars: BatteryVars structure
:param prob: ORTools problem
:return objective function
"""
f_obj = 0.0
for k, elm in enumerate(grid.get_load_like_devices_iter()):
bus_idx = bus_idx_dict.get(elm.bus, None)
active = elm.get_active_at(global_t)
if active and bus_idx is not None:
cost1_pu = elm.get_Cost_at(global_t) / Sbase
p_set_pu = elm.get_P_at(global_t) / Sbase
if p_set_pu > 0.0:
# assign load shedding variable
load_vars.shedding[local_t, k] = prob.add_var(
lb=0,
ub=p_set_pu,
name=join("load_shedding_", [local_t, k], "_")
)
# store the load
load_vars.p[local_t, k] = p_set_pu - load_vars.shedding[local_t, k]
load_vars.shedding_cost[local_t, k] = cost1_pu * load_vars.shedding[local_t, k]
# minimize the load shedding
f_obj += load_vars.shedding_cost[local_t, k]
else:
# the load is negative, won't shed?
load_vars.shedding[local_t, k] = 0.0
# store the load
load_vars.p[local_t, k] = p_set_pu
# add to the balance
bus_vars.Pbalance[local_t, bus_idx] += -load_vars.p[local_t, k]
# add to the injections
# bus_vars.Pinj[t, bus_idx] += - load_vars.p[t, k]
else:
# the load is not available at time step
load_vars.shedding[local_t, k] = 0.0
return f_obj
[docs]
def add_linear_branches_formulation(local_t: int,
global_t: int | None,
grid: MultiCircuit,
bus_idx_dict: Dict[Bus, int],
Sbase: float,
branch_vars: BranchVars,
bus_vars: BusVars,
prob: LpModel,
inf=1e20,
add_losses_approximation: bool = False):
"""
Formulate the branches
:param local_t: time step (possibly reduced or from an interval)
:param global_t: global time (integer or None to signal for the snapshot)
:param grid: MultiCircuit instance
:param bus_idx_dict: Bus-index dictionary
:param Sbase: base power (100 MVA)
:param branch_vars: BranchVars
:param bus_vars: BusVars
:param prob: OR problem
:param inf: number considered infinite
:param add_losses_approximation: If true the distribution factors losses approximation is used
:return objective function
"""
f_obj = 0.0
# for each branch
for m, elm in enumerate(grid.get_branches_iter(add_vsc=False, add_hvdc=False, add_switch=True)):
fr = bus_idx_dict.get(elm.bus_from, None)
to = bus_idx_dict.get(elm.bus_to, None)
# copy rates
branch_vars.rates[local_t, m] = elm.get_rate_at(global_t)
if elm.get_active_at(global_t):
rate_pu = branch_vars.rates[local_t, m] / Sbase
overload_cost_pu = elm.get_Cost_at(global_t) / Sbase
# declare the flow LPVar
branch_vars.flows[local_t, m] = prob.add_var(lb=-inf,
ub=inf,
name=join("flow_", [local_t, m], "_"))
if isinstance(elm, DcLine):
# DC Branch
if elm.R == 0.0:
bk = 1e-20
else:
bk = 1.0 / elm.R
branch_vars.flows[local_t, m] = bk * (bus_vars.Vm[local_t, fr] - bus_vars.Vm[local_t, to])
else:
# AC branch
# compute the branch susceptance
if elm.X == 0.0:
bk = 1e-20
else:
bk = 1.0 / elm.X
# compute the flow
if elm.get_tap_phase_control_mode_at(global_t) == TapPhaseControl.Pf.idx():
# add angle
branch_vars.tap_angles[local_t, m] = prob.add_var(
lb=elm.tap_phase_min,
ub=elm.tap_phase_max,
name=join("tap_ang_", [local_t, m], "_")
)
# is a phase shifter device (like phase shifter transformer or VSC with P control)
# flow_ctr = branch_vars.flows[t, m] == bk * (
# bus_vars.theta[t, fr] - bus_vars.theta[t, to] + branch_vars.tap_angles[t, m])
# prob.add_cst(cst=flow_ctr, name=join("Branch_flow_set_with_ps_", [t, m], "_"))
branch_vars.flows[local_t, m] = bk * (bus_vars.Va[local_t, fr] -
bus_vars.Va[local_t, to] +
branch_vars.tap_angles[local_t, m])
else: # rest of the branches
# is a phase shifter device (like phase shifter transformer or VSC with P control)
# flow_ctr = branch_vars.flows[t, m] == bk * (bus_vars.theta[t, fr] - bus_vars.theta[t, to])
# prob.add_cst(cst=flow_ctr, name=join("Branch_flow_set_", [t, m], "_"))
branch_vars.flows[local_t, m] = bk * (bus_vars.Va[local_t, fr] - bus_vars.Va[local_t, to])
# power injected and subtracted due to the phase shift
bus_vars.Pbalance[local_t, fr] += - branch_vars.flows[local_t, m]
bus_vars.Pbalance[local_t, to] += branch_vars.flows[local_t, m]
if add_losses_approximation:
# declare the abs flow LPVar
branch_vars.z_flows[local_t, m] = prob.add_var(lb=0,
ub=inf,
name=join("z_flow_", [local_t, m], "_"))
# zij β₯ Fij
prob.add_cst(cst=branch_vars.flows[local_t, m] <= branch_vars.z_flows[local_t, m],
name=join("z_flow_u_lim_", [local_t, m]))
# zij β₯ -Fij
prob.add_cst(cst=-branch_vars.flows[local_t, m] <= branch_vars.z_flows[local_t, m],
name=join("z_flow_l_lim_", [local_t, m]))
# add to the objective (Ξ±ij =rij * β£Fij0β£/V2)
# instead of the rates, the literature suggests an absolute initial value of the flow
V = grid.buses[fr].Vnom
if V > 0.0:
factor = elm.R * rate_pu * Sbase / (V ** 2)
else:
factor = elm.R
# store the values for later retrieval
branch_vars.losses[local_t, m] = branch_vars.z_flows[local_t, m] * factor
# add the losses to the objective
f_obj += branch_vars.losses[local_t, m]
# divide the losses contribution equally among the from and to buses as new loads
l_inj = 0.5 * branch_vars.losses[local_t, m]
bus_vars.Pbalance[local_t, fr] += - l_inj
bus_vars.Pbalance[local_t, to] += - l_inj
# add the flow constraint if monitored
if elm.monitor_loading:
branch_vars.flow_slacks_pos[local_t, m] = prob.add_var(
0, inf, name=join("flow_slack_pos_", [local_t, m], "_")
)
branch_vars.flow_slacks_neg[local_t, m] = prob.add_var(
0, inf, name=join("flow_slack_neg_", [local_t, m], "_")
)
# add upper rate constraint
branch_vars.flow_constraints_ub[local_t, m] = (branch_vars.flows[local_t, m] +
branch_vars.flow_slacks_pos[local_t, m] -
branch_vars.flow_slacks_neg[local_t, m]
<= rate_pu)
prob.add_cst(
cst=branch_vars.flow_constraints_ub[local_t, m],
name=join("br_flow_upper_lim_", [local_t, m])
)
# add lower rate constraint
branch_vars.flow_constraints_lb[local_t, m] = (branch_vars.flows[local_t, m] +
branch_vars.flow_slacks_pos[local_t, m] -
branch_vars.flow_slacks_neg[local_t, m]
>= -rate_pu)
prob.add_cst(
cst=branch_vars.flow_constraints_lb[local_t, m],
name=join("br_flow_lower_lim_", [local_t, m])
)
branch_vars.overload_cost[local_t, m] = (
overload_cost_pu * branch_vars.flow_slacks_pos[local_t, m]
+ overload_cost_pu * branch_vars.flow_slacks_neg[local_t, m])
# add to the objective function
f_obj += branch_vars.overload_cost[local_t, m]
return f_obj
[docs]
def add_linear_branches_contingencies_formulation(local_t: int,
global_t: int | None,
grid: MultiCircuit,
Sbase: float,
hvdc_vars: HvdcVars,
vsc_vars: VscVars,
branch_vars: BranchVars,
bus_vars: BusVars,
prob: LpModel,
linear_multi_contingencies: LinearMultiContingencies):
"""
Formulate the branches
:param local_t: time step (possibly reduced or from an interval)
:param global_t: global time (integer or None to signal for the snapshot)
:param grid: MultiCircuit instance
:param Sbase: base power (100 MVA)
:param hvdc_vars: HvdcVars
:param vsc_vars: VscVars
:param branch_vars: BranchVars
:param bus_vars: BusVars
:param prob: OR problem
:param linear_multi_contingencies: LinearMultiContingencies
:return objective function
"""
# We need all the branch objects for this one
all_branches = grid.get_branches(add_vsc=False, add_hvdc=False, add_switch=True)
f_obj = 0.0
for c, contingency in enumerate(linear_multi_contingencies.multi_contingencies):
# compute the contingency flow (Lp expression)
contingency_flows, mask, changed_idx = contingency.get_lp_contingency_flows(
base_flow=branch_vars.flows[local_t, :],
injections=bus_vars.Pinj[local_t, :],
hvdc_flow=hvdc_vars.flows[local_t, :],
vsc_flow=vsc_vars.flows[local_t, :]
)
for m in range(len(contingency_flows)):
contingency_flow = contingency_flows[m]
if mask[m]:
if isinstance(contingency_flow, LpExp): # if the contingency is not 0
rate_pu = all_branches[m].get_rate_at(global_t) / Sbase
# declare slack variables
pos_slack = prob.add_var(0, 1e20, join("br_cst_flow_pos_sl_", [local_t, m, c]))
neg_slack = prob.add_var(0, 1e20, join("br_cst_flow_neg_sl_", [local_t, m, c]))
# register the contingency data to evaluate the result at the end
branch_vars.add_contingency_flow(t=local_t, m=m, c=c,
flow_var=contingency_flow,
neg_slack=neg_slack,
pos_slack=pos_slack)
# add upper rate constraint
prob.add_cst(
cst=contingency_flow + pos_slack - neg_slack <= rate_pu,
name=join("br_cst_flow_upper_lim_", [local_t, m, c])
)
# add lower rate constraint
prob.add_cst(
cst=contingency_flow + pos_slack - neg_slack >= -rate_pu,
name=join("br_cst_flow_lower_lim_", [local_t, m, c])
)
f_obj += pos_slack + neg_slack
return f_obj
[docs]
def add_linear_hvdc_formulation(local_t: int,
global_t: int | None,
grid: MultiCircuit,
bus_idx_dict: Dict[Bus, int],
Sbase: float,
hvdc_vars: HvdcVars,
bus_vars: BusVars,
prob: LpModel):
"""
:param local_t:
:param global_t:
:param grid:
:param bus_idx_dict:
:param Sbase:
:param hvdc_vars:
:param bus_vars:
:param prob:
:return:
"""
f_obj = 0.0
for m, elm in enumerate(grid.hvdc_lines):
fr = bus_idx_dict.get(elm.bus_from, None)
to = bus_idx_dict.get(elm.bus_to, None)
# copy rates
hvdc_vars.rates[local_t, m] = elm.get_rate_at(global_t)
if elm.get_active_at(global_t):
Pset_pu = elm.get_Pset_at(global_t) / Sbase
rate_pu = hvdc_vars.rates[local_t, m] / Sbase
overload_cost_pu = elm.get_Cost_at(global_t) / Sbase
if elm.control_mode == HvdcControlType.type_0_free.idx():
# set the flow based on the angular difference
P0 = Pset_pu
k = elm.angle_droop
hvdc_vars.flows[local_t, m] = (P0 + k * (bus_vars.Va[local_t, fr] - bus_vars.Va[local_t, to]))
# add the injections matching the flow
bus_vars.Pbalance[local_t, fr] += - hvdc_vars.flows[local_t, m]
bus_vars.Pbalance[local_t, to] += hvdc_vars.flows[local_t, m]
elif elm.control_mode == HvdcControlType.type_1_Pset.idx():
if elm.dispatchable[m]:
# declare the flow var
hvdc_vars.flows[local_t, m] = prob.add_var(
lb=-rate_pu, ub=rate_pu, name=join("hvdc_flow_", [local_t, m], "_")
)
# add the injections matching the flow
bus_vars.Pbalance[local_t, fr] += - hvdc_vars.flows[local_t, m]
bus_vars.Pbalance[local_t, to] += hvdc_vars.flows[local_t, m]
else:
if Pset_pu > rate_pu:
P0 = rate_pu
elif Pset_pu < -rate_pu:
P0 = -rate_pu
else:
P0 = Pset_pu
# declare the flow var
hvdc_vars.flows[local_t, m] = prob.add_var(
lb=P0, ub=P0, name=join("hvdc_flow_", [local_t, m], "_")
)
# add the injections matching the flow
bus_vars.Pbalance[local_t, fr] += - hvdc_vars.flows[local_t, m]
bus_vars.Pbalance[local_t, to] += hvdc_vars.flows[local_t, m]
else:
raise Exception('OPF: Unknown HVDC control mode {}'.format(elm.control_mode.value))
else:
# not active, therefore the flow is exactly zero
hvdc_vars.flows[local_t, m] = 0.0
return f_obj
[docs]
def add_linear_vsc_formulation(local_t: int,
global_t: int | None,
grid: MultiCircuit,
bus_idx_dict: Dict[Bus, int],
Sbase: float,
vsc_vars: VscVars,
bus_vars: BusVars,
prob: LpModel,
logger: Logger):
"""
:param local_t:
:param global_t:
:param grid:
:param bus_idx_dict:
:param Sbase:
:param vsc_vars:
:param bus_vars:
:param prob:
:param logger:
:return:
"""
f_obj = 0.0
any_dc_slack = False
for m, elm in enumerate(grid.vsc_devices):
fr = bus_idx_dict.get(elm.bus_from, None)
to = bus_idx_dict.get(elm.bus_to, None)
# copy rates
vsc_vars.rates[local_t, m] = elm.get_rate_at(global_t)
if elm.get_active_at(global_t):
rate_pu = vsc_vars.rates[local_t, m] / Sbase
# declare the flow var
vsc_vars.flows[local_t, m] = prob.add_var(
lb=-rate_pu,
ub=rate_pu,
name=join("vsc_flow_", [local_t, m], "_")
)
# add the injections matching the flow
bus_vars.Pbalance[local_t, fr] += - vsc_vars.flows[local_t, m]
bus_vars.Pbalance[local_t, to] += vsc_vars.flows[local_t, m]
if (elm.control1 == ConverterControlType.Vm_dc.idx()
or elm.control2 == ConverterControlType.Vm_dc.idx()):
# set the DC slack
bus_vars.Vm[local_t, fr] = 1.0
any_dc_slack = True
else:
# not active, therefore the flow is exactly zero
vsc_vars.flows[local_t, m] = 0.0
if not any_dc_slack and grid.get_vsc_number() > 0:
logger.add_warning("No DC Slack! set Vm_dc in any of the converters")
return f_obj
[docs]
def add_linear_node_balance(local_t: int,
grid: MultiCircuit,
vd: IntVec,
bus_vars: BusVars,
nodal_capacity_vars: NodalCapacityVars,
capacity_nodes_idx: IntVec,
prob: LpModel,
logger: Logger):
"""
Add the Kirchhoff nodal equality
:param local_t: time step
:param grid: MultiCircuit
:param vd: List of slack node indices
:param bus_vars: BusVars
:param nodal_capacity_vars: NodalCapacityVars
:param capacity_nodes_idx: IntVec
:param prob: LpModel
:param logger: Logger
"""
# Note: At this point, Pbalance has all the devices' power summed up inside (including branches)
if len(capacity_nodes_idx) > 0:
bus_vars.Pbalance[local_t, capacity_nodes_idx] += nodal_capacity_vars.P[local_t, :]
# add the equality restrictions
for k in range(grid.get_bus_number()):
if isinstance(bus_vars.Pbalance[local_t, k], (int, float)):
bus_vars.kirchhoff[local_t, k] = prob.add_cst(
cst=bus_vars.Va[local_t, k] == 0,
name=join("island_bus_", [local_t, k], "_")
)
logger.add_warning("bus isolated",
device=f'{grid.buses[k].name}@t={local_t}')
else:
bus_vars.kirchhoff[local_t, k] = prob.add_cst(
cst=bus_vars.Pbalance[local_t, k] == 0,
name=join("kirchoff_", [local_t, k], "_")
)
# set all slack angles to 0
for i in vd:
prob.set_var_bounds(var=bus_vars.Va[local_t, i], lb=0.0, ub=0.0)
[docs]
def add_copper_plate_balance(t_idx: int,
bus_vars: BusVars,
prob: LpModel, ):
"""
Add the copperplate equality
:param t_idx: time step
:param bus_vars: BusVars
:param prob: LpModel
"""
bus_vars.kirchhoff[t_idx, 0] = prob.add_cst(
cst=sum(bus_vars.Pbalance[t_idx, :]) == 0,
name=join("copper_plate_", [t_idx, 0], "_")
)
[docs]
def add_hydro_formulation(local_t: Union[int, None],
global_t: int | None,
grid: MultiCircuit,
time_array: DateVec,
Sbase: float,
node_vars: FluidNodeVars,
path_vars: FluidPathVars,
inj_vars: FluidInjectionVars,
generator_vars: GenerationVars,
fluid_level_0: Vec,
prob: LpModel,
logger: Logger):
"""
Formulate the branches
:param local_t: local time index
:param global_t: global time index
:param grid: MultiCircuit
:param time_array: list of time indices
:param Sbase: base power of the system
:param node_vars: FluidNodeVars
:param path_vars: FluidPathVars
:param inj_vars: FluidInjectionVars
:param generator_vars: GeneratorVars
:param fluid_level_0: Initial node level
:param prob: OR problem
:param logger: log of the LP
:return objective function
"""
f_obj = 0.0
fluid_node_dict: Dict[FluidNode, int] = dict()
for m, elm in enumerate(grid.fluid_nodes):
node_vars.spillage[local_t, m] = prob.add_var(lb=0.00,
ub=1e20,
name=join("NodeSpillage_", [local_t, m], "_"))
f_obj += elm.get_spillage_cost_at(global_t) * node_vars.spillage[local_t, m]
min_abs_level = elm.max_level * elm.min_soc
node_vars.current_level[local_t, m] = prob.add_var(lb=min_abs_level,
ub=elm.max_level * elm.max_soc,
name=join("level_", [local_t, m], "_"))
if min_abs_level < elm.min_level:
logger.add_error(msg='Node SOC is below the allowed minimum level',
value=min_abs_level,
expected_value=elm.min_level,
device_class="FluidNode",
device_property=f"Min SOC at {local_t}")
# initialize fluid path's vars
for m, elm in enumerate(grid.fluid_paths):
path_vars.flow[local_t, m] = prob.add_var(lb=elm.min_flow,
ub=elm.max_flow,
name=join("hflow_", [local_t, m], "_"))
# Constraints
for m, elm in enumerate(grid.fluid_paths):
# inflow: fluid flow entering the target node in m3/s
# outflow: fluid flow leaving the source node in m3/s
# flow: amount of fluid flowing through the river in m3/s
source_idx = fluid_node_dict.get(elm.source, None)
target_idx = fluid_node_dict.get(elm.target, None)
if source_idx is not None and target_idx is not None:
node_vars.flow_in[local_t, target_idx] += path_vars.flow[local_t, m]
node_vars.flow_out[local_t, source_idx] += path_vars.flow[local_t, m]
gen_dict = {elm: i for i, elm in enumerate(grid.generators)}
for m, elm in enumerate(grid.turbines):
gen_idx = gen_dict.get(elm.generator, None)
plant_idx = fluid_node_dict.get(elm.plant, None)
if gen_idx is not None and plant_idx is not None:
# flow [m3/s] = pgen [pu] * max_flow [m3/s] / (Pgen_max [MW] / Sbase [MW] * eff)
coeff = elm.max_flow_rate / (elm.generator.get_Pmax_at(global_t) / Sbase * elm.efficiency)
turbine_flow = (generator_vars.p[local_t, gen_idx] * coeff)
# node_vars.flow_out[t, plant_idx] = turbine_flow # assume only 1 turbine connected
# if t > 0:
inj_vars.flow[local_t, m] = turbine_flow # to retrieve the value later on
prob.add_cst(cst=(node_vars.flow_out[local_t, plant_idx] == turbine_flow),
name=join("turbine_river_", [local_t, m], "_"))
if elm.generator.get_Pmin_at(global_t) < 0:
logger.add_error(msg='Turbine generator pmin < 0 is not possible',
value=elm.generator.get_Pmin_at(global_t))
# f_obj += turbine_flow
n_turbines = grid.get_fluid_turbines_number()
for m, elm in enumerate(grid.pumps):
gen_idx = gen_dict.get(elm.generator, None)
plant_idx = fluid_node_dict.get(elm.plant, None)
if gen_idx is not None and plant_idx is not None:
# flow [m3/s] = pcons [pu] * max_flow [m3/s] * eff / (Pcons_min [MW] / Sbase [MW])
# invert the efficiency compared to a turbine
# pmin instead of pmax because the sign should be inverted (consuming instead of generating)
coeff = elm.max_flow_rate * elm.efficiency / (abs(elm.generator.get_Pmin_at(global_t)) / Sbase)
pump_flow = (generator_vars.p[local_t, gen_idx] * coeff)
# node_vars.flow_in[t, plant_idx] = pump_flow # assume only 1 pump connected
# if t > 0:
inj_vars.flow[local_t, m + n_turbines] = - pump_flow
prob.add_cst(cst=(node_vars.flow_in[local_t, plant_idx] == - pump_flow),
name=join("pump_river_", [local_t, m], "_"))
if elm.generator.get_Pmax_at(global_t) > 0:
logger.add_error(msg='Pump generator pmax > 0 is not possible',
value=elm.generator.get_Pmax_at(global_t))
# f_obj += - pump_flow
n_pumps = grid.get_fluid_pumps_number()
for m, elm in enumerate(grid.p2xs):
gen_idx = gen_dict.get(elm.generator, None)
plant_idx = fluid_node_dict.get(elm.plant, None)
if gen_idx is not None and plant_idx is not None:
# flow[m3/s] = pcons [pu] * max_flow [m3/s] * eff / (Pcons_max [MW] / Sbase [MW])
# invert the efficiency compared to a turbine
# pmin instead of pmax because the sign should be inverted (consuming instead of generating)
coeff = elm.max_flow_rate * elm.efficiency / (abs(elm.generator.get_Pmin_at(global_t)) / Sbase)
p2x_flow = (generator_vars.p[local_t, gen_idx] * coeff)
# if t > 0:
node_vars.p2x_flow[local_t, plant_idx] += - p2x_flow
inj_vars.flow[local_t, m + n_turbines + n_pumps] = - p2x_flow
if elm.generator.get_Pmax_at(global_t) > 0:
logger.add_error(msg='P2X generator pmax > 0 is not possible',
value=elm.generator.get_Pmax_at(global_t))
# f_obj += - p2x_flow
if global_t is not None:
# constraints for the node level
for m, elm in enumerate(grid.fluid_nodes):
if local_t == 0:
if len(time_array) > global_t + 1:
dt = (time_array[global_t + 1] - time_array[global_t]).seconds
else:
dt = 3600.0
# Initialize level at fluid_level_0
prob.add_cst(cst=(node_vars.current_level[local_t, m] ==
fluid_level_0[m]
+ dt * elm.get_inflow_at(global_t)
+ dt * node_vars.flow_in[local_t, m]
+ dt * node_vars.p2x_flow[local_t, m]
- dt * node_vars.spillage[local_t, m]
- dt * node_vars.flow_out[local_t, m]),
name=join("nodal_balance_", [local_t, m], "_"))
else:
# Update the level according to the in and out flows as time passes
dt = (time_array[global_t] - time_array[global_t - 1]).seconds
prob.add_cst(cst=(node_vars.current_level[local_t, m] ==
node_vars.current_level[local_t - 1, m]
+ dt * elm.get_inflow_at(global_t)
+ dt * node_vars.flow_in[local_t, m]
+ dt * node_vars.p2x_flow[local_t, m]
- dt * node_vars.spillage[local_t, m]
- dt * node_vars.flow_out[local_t, m]),
name=join("nodal_balance_", [local_t, m], "_"))
return f_obj
[docs]
def run_linear_opf_ts(grid: MultiCircuit,
time_indices: Union[IntVec, None],
solver_type: MIPSolvers = MIPSolvers.HIGHS,
zonal_grouping: ZonalGrouping = ZonalGrouping.NoGrouping,
skip_generation_limits: bool = False,
consider_contingencies: bool = False,
contingency_groups_used: Union[List[ContingencyGroup], None] = None,
unit_commitment: bool = False,
ramp_constraints: bool = False,
generation_expansion_planning: bool = False,
all_generators_fixed: bool = False,
lodf_threshold: float = 0.001,
maximize_inter_area_flow: bool = False,
inter_aggregation_info: InterAggregationInfo | None = None,
energy_0: Union[Vec, None] = None,
fluid_level_0: Union[Vec, None] = None,
optimize_nodal_capacity: bool = False,
nodal_capacity_sign: float = 1.0,
capacity_nodes_idx: Union[IntVec, None] = None,
use_glsk_as_cost: bool = False,
add_losses_approximation: bool = False,
logger: Logger = Logger(),
progress_text: Union[None, Callable[[str], None]] = None,
progress_func: Union[None, Callable[[float], None]] = None,
verbose: int = 0,
robust: bool = False,
mip_framework: MIPFramework = MIPFramework.PuLP) -> OpfVars:
"""
Run linear optimal power flow
:param grid: MultiCircuit instance
:param time_indices: Time indices (in the general scheme)
:param solver_type: MIP solver to use
:param zonal_grouping: Zonal grouping?
:param skip_generation_limits: Skip the generation limits?
:param consider_contingencies: Consider the contingencies?
:param contingency_groups_used: List of contingency groups to use
:param unit_commitment: Formulate unit commitment?
:param ramp_constraints: Formulate ramp constraints?
:param generation_expansion_planning: Generation expansion planning?
:param all_generators_fixed: All generators take their snapshot or profile values
instead of resorting to dispatchable status
:param lodf_threshold: LODF threshold value to consider contingencies
:param maximize_inter_area_flow: Maximize the inter-area flow?
:param inter_aggregation_info: Inter rea (or country, etc) information
:param energy_0: Vector of initial energy for batteries (size: Number of batteries)
:param fluid_level_0: initial fluid level of the nodes
:param optimize_nodal_capacity: Optimize the nodal capacity? (optional)
:param nodal_capacity_sign: if > 0 the generation is maximized, if < 0 the load is maximized
:param capacity_nodes_idx: Array of bus indices to optimize their nodal capacity for
:param use_glsk_as_cost: If true the generators use the GLSK as dispatch values
:param add_losses_approximation: If true the distribution factors losses approximation is used
:param logger: logger instance
:param progress_text: Text progress callback
:param progress_func: Numerical progress callback
:param verbose: verbosity level
:param robust: Robust optimization?
:return: OpfVars
"""
bus_dict = {bus: i for i, bus in enumerate(grid.buses)}
areas_dict = {elm: i for i, elm in enumerate(grid.areas)}
if time_indices is None:
time_indices = [None]
else:
if len(time_indices) > 0:
# time indices are ok
pass
else:
time_indices = [None]
active_nodal_capacity = True
if capacity_nodes_idx is None:
active_nodal_capacity = False
capacity_nodes_idx = np.zeros(0, dtype=int)
if contingency_groups_used is None:
contingency_groups_used = grid.get_contingency_groups()
nt = len(time_indices) if len(time_indices) > 0 else 1
n = grid.get_bus_number()
nbr = grid.get_branch_number(add_vsc=False, add_hvdc=False, add_switch=True)
ng = grid.get_generators_number()
nb = grid.get_batteries_number()
nl = grid.get_load_like_device_number()
n_hvdc = grid.get_hvdc_number()
n_vsc = grid.get_vsc_number()
n_fluid_node = grid.get_fluid_nodes_number()
n_fluid_path = grid.get_fluid_paths_number()
n_fluid_inj = grid.get_fluid_injection_number()
# gather the fuels and emission rates matrices
gen_emissions_rates_matrix = grid.get_gen_emission_rates_sparse_matrix()
gen_fuel_rates_matrix = grid.get_gen_fuel_rates_sparse_matrix()
gen_tech_shares_matrix = grid.get_gen_technology_connectivity_matrix()
batt_tech_shares_matrix = grid.get_batt_technology_connectivity_matrix()
if maximize_inter_area_flow:
inter_area_branches = inter_aggregation_info.lst_br
inter_area_hvdc = inter_aggregation_info.lst_br_hvdc
else:
inter_area_branches = list()
inter_area_hvdc = list()
# dictionary of bus indices
bus_idx_dict: Dict[Bus, int] = grid.get_bus_index_dict()
# declare structures of LP vars
mip_vars = OpfVars(nt=nt, nbus=n, ng=ng, nb=nb, nl=nl,
nbr=nbr, n_hvdc=n_hvdc, n_vsc=n_vsc,
n_fluid_node=n_fluid_node,
n_fluid_path=n_fluid_path,
n_fluid_inj=n_fluid_inj,
n_cap_buses=len(capacity_nodes_idx))
# create the MIP problem object
lp_model: LpModel = get_model_instance(tpe=mip_framework, solver_type=solver_type)
logger.add_info(f"MIP Framework", value=lp_model.name)
logger.add_info(f"MIP Solver", value=solver_type.value)
# objective function
f_obj: Union[LpExp, float] = 0.0
# compile the circuit at the master time index ------------------------------------------------------------
# note: There are very little chances of simplifying this step and experience shows
# it is not worth the effort, so compile every time step
nc: NumericalCircuit = compile_numerical_circuit_at(
circuit=grid,
t_idx=None, # yes, this is not a bug
bus_dict=bus_dict,
areas_dict=areas_dict,
fill_gep=generation_expansion_planning,
logger=logger
)
indices = nc.get_simulation_indices()
for local_t_idx in range(len(time_indices)): # use time_indices = [None] to simulate the snapshot
global_t_idx = time_indices[local_t_idx]
# time indices:
# imagine that the complete VeraGrid DB time goes from 0 to 1000
# but, for whatever reason, time_indices is [100..200]
# local_t_idx would go from 0..100
# global_t_idx would go from 100..200
# formulate the bus angles ---------------------------------------------------------------------------------
for k in range(nc.bus_data.nbus):
# we declare Vm for DC buses and Va for AC buses
if nc.bus_data.is_dc[k]:
mip_vars.bus_vars.Vm[local_t_idx, k] = lp_model.add_var(
lb=nc.bus_data.Vmin[k],
ub=nc.bus_data.Vmax[k],
name=join("Vm_", [local_t_idx, k], "_")
)
else:
mip_vars.bus_vars.Va[local_t_idx, k] = lp_model.add_var(
lb=nc.bus_data.angle_min[k],
ub=nc.bus_data.angle_max[k],
name=join("Va_", [local_t_idx, k], "_")
)
# formulate loads ------------------------------------------------------------------------------------------
f_obj += add_linear_load_formulation(
local_t=local_t_idx,
global_t=global_t_idx,
grid=grid,
bus_idx_dict=bus_idx_dict,
Sbase=nc.Sbase,
bus_vars=mip_vars.bus_vars,
load_vars=mip_vars.load_vars,
prob=lp_model
)
# formulate generation -------------------------------------------------------------------------------------
f_obj += add_linear_generation_formulation(
local_t=local_t_idx,
global_t=global_t_idx,
grid=grid,
bus_idx_dict=bus_idx_dict,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
gen_vars=mip_vars.gen_vars,
prob=lp_model,
unit_commitment=unit_commitment,
ramp_constraints=ramp_constraints,
skip_generation_limits=skip_generation_limits,
all_generators_fixed=all_generators_fixed,
vd=indices.vd,
nodal_capacity_active=active_nodal_capacity,
generation_expansion_planning=generation_expansion_planning,
use_glsk_as_cost=use_glsk_as_cost,
logger=logger
)
# formulate batteries --------------------------------------------------------------------------------------
if local_t_idx == 0 and energy_0 is None:
# declare the initial energy of the batteries
energy_0 = nc.battery_data.soc_0 * nc.battery_data.enom # in MWh here
f_obj += add_linear_battery_formulation(
local_t=local_t_idx,
global_t=global_t_idx,
grid=grid,
bus_idx_dict=bus_idx_dict,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
batt_vars=mip_vars.batt_vars,
prob=lp_model,
unit_commitment=unit_commitment,
ramp_constraints=ramp_constraints,
skip_generation_limits=skip_generation_limits,
generation_expansion_planning=generation_expansion_planning,
energy_0=energy_0
)
# formulate batteries --------------------------------------------------------------------------------------
if optimize_nodal_capacity:
f_obj += add_nodal_capacity_formulation(
t=local_t_idx,
nodal_capacity_vars=mip_vars.nodal_capacity_vars,
nodal_capacity_sign=nodal_capacity_sign,
capacity_nodes_idx=capacity_nodes_idx,
prob=lp_model
)
# add emissions ------------------------------------------------------------------------------------------------
if gen_emissions_rates_matrix.shape[0] > 0:
# amount of emissions per gas
emissions = lpDot(gen_emissions_rates_matrix, mip_vars.gen_vars.p[local_t_idx, :])
f_obj += lp_model.sum(emissions)
# add fuels ----------------------------------------------------------------------------------------------------
if gen_fuel_rates_matrix.shape[0] > 0:
# amount of fuels
fuels_amount = lpDot(gen_fuel_rates_matrix, mip_vars.gen_vars.p[local_t_idx, :])
f_obj += lp_model.sum(fuels_amount)
# --------------------------------------------------------------------------------------------------------------
# if no zonal grouping, all the grid is considered...
if zonal_grouping == ZonalGrouping.NoGrouping:
# formulate hvdc -------------------------------------------------------------------------------------------
f_obj += add_linear_hvdc_formulation(
local_t=local_t_idx,
global_t=global_t_idx,
grid=grid,
bus_idx_dict=bus_idx_dict,
Sbase=nc.Sbase,
hvdc_vars=mip_vars.hvdc_vars,
bus_vars=mip_vars.bus_vars,
prob=lp_model
)
# formulate vsc --------------------------------------------------------------------------------------------
f_obj += add_linear_vsc_formulation(
local_t=local_t_idx,
global_t=global_t_idx,
grid=grid,
bus_idx_dict=bus_idx_dict,
Sbase=nc.Sbase,
vsc_vars=mip_vars.vsc_vars,
bus_vars=mip_vars.bus_vars,
prob=lp_model,
logger=logger
)
# formulate branches ---------------------------------------------------------------------------------------
f_obj += add_linear_branches_formulation(
local_t=local_t_idx,
global_t=global_t_idx,
grid=grid,
bus_idx_dict=bus_idx_dict,
Sbase=nc.Sbase,
branch_vars=mip_vars.branch_vars,
bus_vars=mip_vars.bus_vars,
prob=lp_model,
inf=1e20,
add_losses_approximation=add_losses_approximation
)
# formulate nodes ------------------------------------------------------------------------------------------
add_linear_node_balance(
local_t=local_t_idx,
grid=grid,
vd=indices.vd,
bus_vars=mip_vars.bus_vars,
nodal_capacity_vars=mip_vars.nodal_capacity_vars,
capacity_nodes_idx=capacity_nodes_idx,
prob=lp_model,
logger=logger
)
# add branch contingencies ---------------------------------------------------------------------------------
if consider_contingencies:
if len(contingency_groups_used) > 0:
# The contingencies' formulation uses the total nodal injection stored in bus_vars,
# hence, this step goes before the add_linear_node_balance function
# compute the PTDF and LODF
ls = LinearAnalysis(nc=nc,
distributed_slack=False,
correct_values=True)
# Compute the more generalist contingency structures
mctg = LinearMultiContingencies(grid=grid,
contingency_groups_used=contingency_groups_used)
mctg.compute(lin=ls,
ptdf_threshold=lodf_threshold,
lodf_threshold=lodf_threshold)
# formulate the contingencies
f_obj += add_linear_branches_contingencies_formulation(
local_t=local_t_idx,
global_t=global_t_idx,
grid=grid,
Sbase=nc.Sbase,
branch_vars=mip_vars.branch_vars,
hvdc_vars=mip_vars.hvdc_vars,
vsc_vars=mip_vars.vsc_vars,
bus_vars=mip_vars.bus_vars,
prob=lp_model,
linear_multi_contingencies=mctg
)
else:
logger.add_warning(msg="Contingencies enabled, but no contingency groups provided")
# add inter area branch flow maximization ------------------------------------------------------------------
if maximize_inter_area_flow:
# maximize the power at the from buses
for i in inter_aggregation_info.idx_bus_from:
f_obj += - mip_vars.bus_vars.Pgen[local_t_idx, i]
# minimize the power at the to buses
for i in inter_aggregation_info.idx_bus_to:
f_obj += mip_vars.bus_vars.Pgen[local_t_idx, i]
for k, branch, sense in inter_area_branches:
# we want to maximize, hence the minus sign
f_obj += mip_vars.branch_vars.flows[local_t_idx, k] * (- sense)
for k, branch, sense in inter_area_hvdc:
# we want to maximize, hence the minus sign
f_obj += mip_vars.hvdc_vars.flows[local_t_idx, k] * (- sense)
# add hydro side -------------------------------------------------------------------------------------------
if local_t_idx == 0 and fluid_level_0 is None:
# declare the initial level of the fluid nodes
fluid_level_0 = nc.fluid_node_data.initial_level
if n_fluid_node > 0:
f_obj += add_hydro_formulation(local_t=local_t_idx,
global_t=global_t_idx,
grid=grid,
time_array=grid.time_profile,
Sbase=nc.Sbase,
node_vars=mip_vars.fluid_node_vars,
path_vars=mip_vars.fluid_path_vars,
inj_vars=mip_vars.fluid_inject_vars,
generator_vars=mip_vars.gen_vars,
fluid_level_0=fluid_level_0,
prob=lp_model,
logger=logger)
elif zonal_grouping == ZonalGrouping.All:
# this is the copper plate approach
add_copper_plate_balance(
t_idx=local_t_idx,
bus_vars=mip_vars.bus_vars,
prob=lp_model,
)
# production equals demand -------------------------------------------------------------------------------------
# NOTE: The production == demand, happens with the kirchoff equation for the grid and with the
# copper plate balance for the copper plate scenario
if progress_func is not None:
progress_func((local_t_idx + 1) / nt * 100.0)
# set the objective function
lp_model.minimize(f_obj)
# solve
if progress_text is not None:
progress_text("Solving...")
if progress_func is not None:
progress_func(0)
status = lp_model.solve(robust=robust, show_logs=verbose > 0, progress_text=progress_text)
# gather the results
logger.add_info(msg="Status", value=lp_model.status2string(status))
if status == lp_model.OPTIMAL:
logger.add_info("Objective function", value=lp_model.fobj_value())
mip_vars.acceptable_solution = True
else:
logger.add_error("The problem does not have an optimal solution.")
mip_vars.acceptable_solution = False
lp_file_name = os.path.join(opf_file_path(), f"{grid.name} opf debug.lp")
lp_model.save_model(file_name=lp_file_name)
logger.add_info("Debug LP model saved", value=lp_file_name)
# convert the lp vars to their values
vars_v = mip_vars.get_values(Sbase=grid.Sbase,
model=lp_model,
gen_emissions_rates_matrix=gen_emissions_rates_matrix,
gen_fuel_rates_matrix=gen_fuel_rates_matrix,
gen_tech_shares_matrix=gen_tech_shares_matrix,
batt_tech_shares_matrix=batt_tech_shares_matrix)
# add the model logger to the main logger
logger += lp_model.logger
# lp_model.save_model('nodal_opf.lp')
return vars_v