Source code for VeraGridEngine.Simulations.OPF.Formulations.linear_opf_ts
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
"""
This file implements a DC-OPF for time series
That means that solves the OPF problem for a complete time series at once
"""
from __future__ import annotations
import os
import numpy as np
from typing import List, Union, Tuple, Callable
from scipy.sparse import csc_matrix
from VeraGridEngine.IO.file_system import opf_file_path
from VeraGridEngine.Devices.multi_circuit import MultiCircuit
from VeraGridEngine.Devices.Aggregation.inter_aggregation_info import InterAggregationInfo
from VeraGridEngine.Devices.Events.contingency_group import ContingencyGroup
from VeraGridEngine.Compilers.circuit_to_data import compile_numerical_circuit_at
from VeraGridEngine.DataStructures.numerical_circuit import NumericalCircuit
from VeraGridEngine.DataStructures.generator_data import GeneratorData
from VeraGridEngine.DataStructures.battery_data import BatteryData
from VeraGridEngine.DataStructures.load_data import LoadData
from VeraGridEngine.DataStructures.passive_branch_data import PassiveBranchData
from VeraGridEngine.DataStructures.active_branch_data import ActiveBranchData
from VeraGridEngine.DataStructures.hvdc_data import HvdcData
from VeraGridEngine.DataStructures.vsc_data import VscData
from VeraGridEngine.DataStructures.bus_data import BusData
from VeraGridEngine.DataStructures.fluid_node_data import FluidNodeData
from VeraGridEngine.DataStructures.fluid_path_data import FluidPathData
from VeraGridEngine.DataStructures.fluid_turbine_data import FluidTurbineData
from VeraGridEngine.DataStructures.fluid_pump_data import FluidPumpData
from VeraGridEngine.DataStructures.fluid_p2x_data import FluidP2XData
from VeraGridEngine.basic_structures import Logger, Vec, IntVec, DateVec, Mat
from VeraGridEngine.Utils.MIP.selected_interface import LpExp, LpVar, LpModel, lpDot, join, get_model_instance
from VeraGridEngine.enumerations import (HvdcControlType, ZonalGrouping, MIPSolvers, TapPhaseControl,
ConverterControlType, MIPFramework, OpfDispatchMode)
from VeraGridEngine.Simulations.LinearFactors.linear_analysis import (LinearAnalysis, LinearMultiContingency,
LinearMultiContingencies)
[docs]
def get_contingency_flow_with_filter(multi_contingency: LinearMultiContingency,
base_flow: Vec,
injections: Union[None, Vec],
threshold: float,
m: int) -> LpExp:
"""
Get contingency flow
:param multi_contingency: MultiContingency object
:param base_flow: Base branch flows (nbranch)
:param injections: Bus injections increments (nbus)
:param threshold: threshold to filter contingency elements
:param m: branch monitor index (int)
:return: New flows (nbranch)
"""
res = base_flow[m] + 0
if len(multi_contingency.branch_indices):
for i, c in enumerate(multi_contingency.branch_indices):
if abs(multi_contingency.mlodf_factors[m, i]) >= threshold:
res += multi_contingency.mlodf_factors[m, i] * base_flow[c]
if len(multi_contingency.bus_indices):
for i, c in enumerate(multi_contingency.bus_indices):
if abs(multi_contingency.compensated_ptdf_factors[m, i]) >= threshold:
res += multi_contingency.compensated_ptdf_factors[m, i] * multi_contingency.injections_factor[i] * \
injections[c]
return res
[docs]
class BusVars:
"""
Struct to store the bus related vars
"""
def __init__(self, nt: int, n_elm: int):
"""
BusVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.Va = np.zeros((nt, n_elm), dtype=object)
self.Vm = np.ones((nt, n_elm), dtype=object)
self.Pinj = np.zeros((nt, n_elm), dtype=object)
self.Pgen = np.zeros((nt, n_elm), dtype=object)
self.Pbalance = np.zeros((nt, n_elm), dtype=object)
self.kirchhoff = np.zeros((nt, n_elm), dtype=object)
self.shadow_prices = np.zeros((nt, n_elm), dtype=float)
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "BusVars":
"""
Return an instance of this class where the array's content
is not LP vars but their value
:return: BusVars
"""
nt, n_elm = self.Va.shape
data = BusVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.Va[t, i] = model.get_value(self.Va[t, i])
data.Vm[t, i] = model.get_value(self.Vm[t, i])
data.Pinj[t, i] = model.get_value(self.Pinj[t, i])
data.Pgen[t, i] = model.get_value(self.Pgen[t, i])
data.Pbalance[t, i] = model.get_value(self.Pbalance[t, i]) * Sbase
data.shadow_prices[t, i] = model.get_dual_value(self.kirchhoff[t, i])
# format the arrays appropriately
data.Va = data.Va.astype(float, copy=False)
data.Vm = data.Vm.astype(float, copy=False)
data.Pinj = data.Pinj.astype(float, copy=False)
data.Pgen = data.Pgen.astype(float, copy=False)
data.Pbalance = data.Pbalance.astype(float, copy=False)
data.shadow_prices = data.shadow_prices.astype(float, copy=False)
return data
[docs]
class NodalCapacityVars:
"""
Struct to store the nodal capacity related vars
"""
def __init__(self, nt: int, n_elm: int):
"""
BusVars structure
:param nt: Number of time steps
:param n_elm: Number of buses to optimize the capacity for
"""
self.P = np.zeros((nt, n_elm), dtype=object) # in per-unit power
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "NodalCapacityVars":
"""
Return an instance of this class where the array's
content is not LP vars but their value
:return: BusVars
"""
nt, n_elm = self.P.shape
data = NodalCapacityVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.P[t, i] = model.get_value(self.P[t, i]) * Sbase
# format the arrays appropriately
data.P = data.P.astype(float, copy=False)
return data
[docs]
class LoadVars:
"""
Struct to store the load related vars
"""
def __init__(self, nt: int, n_elm: int):
"""
LoadVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.shedding = np.zeros((nt, n_elm), dtype=object)
self.p = np.zeros((nt, n_elm), dtype=object) # to be filled (no vars)
self.shedding_cost = np.zeros((nt, n_elm), dtype=object)
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "LoadVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: LoadVars
"""
nt, n_elm = self.shedding.shape
data = LoadVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.shedding[t, i] = model.get_value(self.shedding[t, i]) * Sbase
data.p[t, i] = model.get_value(self.p[t, i]) * Sbase
data.shedding_cost[t, i] = model.get_value(self.shedding_cost[t, i]) * Sbase
# format the arrays appropriately
data.shedding = data.shedding.astype(float, copy=False)
data.p = data.p.astype(float, copy=False)
data.shedding_cost = data.shedding_cost.astype(float, copy=False)
return data
[docs]
class GenerationVars:
"""
Struct to store the generation vars
"""
def __init__(self, nt: int, n_elm: int):
"""
GenerationVars structure
:param nt: Number of time steps
:param n_elm: Number of generators
"""
self.p = np.zeros((nt, n_elm), dtype=object)
self.dp = np.zeros((nt, n_elm), dtype=object)
self.shedding = np.zeros((nt, n_elm), dtype=object)
self.producing = np.zeros((nt, n_elm), dtype=object)
self.starting_up = np.zeros((nt, n_elm), dtype=object)
self.shutting_down = np.zeros((nt, n_elm), dtype=object)
self.cost = np.zeros((nt, n_elm), dtype=object)
self.invested = np.zeros((nt, n_elm), dtype=object)
[docs]
def get_values(self,
Sbase: float,
model: LpModel,
gen_emissions_rates_matrix: csc_matrix,
gen_fuel_rates_matrix: csc_matrix) -> "GenerationVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:param Sbase: Base power (100 MVA)
:param model: LpModel
:param gen_emissions_rates_matrix: emissins rates matrix (n_emissions, n_gen)
:param gen_fuel_rates_matrix: fuel rates matrix (n_fuels, n_gen)
:return: GenerationVars
"""
nt, n_elm = self.p.shape
data = GenerationVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.p[t, i] = model.get_value(self.p[t, i]) * Sbase
data.dp[t, i] = model.get_value(self.dp[t, i]) * Sbase
data.shedding[t, i] = model.get_value(self.shedding[t, i]) * Sbase
data.producing[t, i] = model.get_value(self.producing[t, i])
data.starting_up[t, i] = model.get_value(self.starting_up[t, i])
data.shutting_down[t, i] = model.get_value(self.shutting_down[t, i])
data.cost[t, i] = model.get_value(self.cost[t, i]) * Sbase
data.invested[t, i] = model.get_value(self.invested[t, i])
# format the arrays appropriately
data.p = data.p.astype(float, copy=False)
data.dp = data.dp.astype(float, copy=False)
data.shedding = data.shedding.astype(float, copy=False)
data.producing = data.producing.astype(bool, copy=False)
data.starting_up = data.starting_up.astype(bool, copy=False)
data.shutting_down = data.shutting_down.astype(bool, copy=False)
data.cost = data.cost.astype(float, copy=False)
data.invested = data.invested.astype(bool, copy=False)
return data
[docs]
class BatteryVars(GenerationVars):
"""
struct extending the generation vars to handle the battery vars
"""
def __init__(self, nt: int, n_elm: int):
"""
BatteryVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
GenerationVars.__init__(self, nt=nt, n_elm=n_elm)
self.e = np.zeros((nt, n_elm), dtype=object)
[docs]
def get_values(self, Sbase: float, model: LpModel,
gen_emissions_rates_matrix: csc_matrix = None, # not needed but included for compatibiliy
gen_fuel_rates_matrix: csc_matrix = None # not needed but included for compatibiliy
) -> "BatteryVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: GenerationVars
"""
nt, n_elm = self.p.shape
data = BatteryVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.p[t, i] = model.get_value(self.p[t, i]) * Sbase
data.e[t, i] = model.get_value(self.e[t, i]) * Sbase
data.shedding[t, i] = model.get_value(self.shedding[t, i]) * Sbase
data.producing[t, i] = model.get_value(self.producing[t, i])
data.starting_up[t, i] = model.get_value(self.starting_up[t, i])
data.shutting_down[t, i] = model.get_value(self.shutting_down[t, i])
data.cost[t, i] = model.get_value(self.cost[t, i])
data.invested[t, i] = model.get_value(self.invested[t, i])
# format the arrays appropriately
data.p = data.p.astype(float, copy=False)
data.e = data.e.astype(float, copy=False)
data.shedding = data.shedding.astype(float, copy=False)
data.producing = data.producing.astype(int, copy=False)
data.starting_up = data.starting_up.astype(int, copy=False)
data.shutting_down = data.shutting_down.astype(int, copy=False)
data.cost = data.cost.astype(float, copy=False)
data.invested = data.invested.astype(bool, copy=False)
return data
[docs]
class BranchVars:
"""
Struct to store the branch related vars
"""
def __init__(self, nt: int, n_elm: int):
"""
BranchVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.flows = np.zeros((nt, n_elm), dtype=object)
self.z_flows = np.zeros((nt, n_elm), dtype=object) # absolute used with add_losses_approximation
self.losses = np.zeros((nt, n_elm), dtype=object) # absolute used with add_losses_approximation
self.flow_slacks_pos = np.zeros((nt, n_elm), dtype=object)
self.flow_slacks_neg = np.zeros((nt, n_elm), dtype=object)
self.tap_angles = np.zeros((nt, n_elm), dtype=object)
self.flow_constraints_ub = np.zeros((nt, n_elm), dtype=object)
self.flow_constraints_lb = np.zeros((nt, n_elm), dtype=object)
self.overload_cost = np.zeros((nt, n_elm), dtype=object)
self.rates = np.zeros((nt, n_elm), dtype=float)
self.loading = np.zeros((nt, n_elm), dtype=float)
# t, m, c, contingency, negative_slack, positive_slack
self.contingency_flow_data: List[Tuple[int, int, int, Union[float, LpVar, LpExp], LpVar, LpVar]] = list()
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "BranchVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: BranchVars
"""
nt, n_elm = self.flows.shape
data = BranchVars(nt=nt, n_elm=n_elm)
data.rates = self.rates
for t in range(nt):
for i in range(n_elm):
data.flows[t, i] = model.get_value(self.flows[t, i]) * Sbase
data.flow_slacks_pos[t, i] = model.get_value(self.flow_slacks_pos[t, i]) * Sbase
data.flow_slacks_neg[t, i] = model.get_value(self.flow_slacks_neg[t, i]) * Sbase
data.tap_angles[t, i] = model.get_value(self.tap_angles[t, i])
data.flow_constraints_ub[t, i] = model.get_value(self.flow_constraints_ub[t, i])
data.flow_constraints_lb[t, i] = model.get_value(self.flow_constraints_lb[t, i])
data.overload_cost[t, i] = model.get_value(self.overload_cost[t, i]) * Sbase
data.losses[t, i] = model.get_value(self.losses[t, i]) * Sbase
for i in range(len(self.contingency_flow_data)):
t, m, c, var, neg_slack, pos_slack = self.contingency_flow_data[i]
self.contingency_flow_data[i] = (t, m, c,
model.get_value(var),
model.get_value(neg_slack),
model.get_value(pos_slack))
# format the arrays appropriately
data.flows = data.flows.astype(float, copy=False)
data.flow_slacks_pos = data.flow_slacks_pos.astype(float, copy=False)
data.flow_slacks_neg = data.flow_slacks_neg.astype(float, copy=False)
data.overload_cost = data.overload_cost.astype(float, copy=False)
data.tap_angles = data.tap_angles.astype(float, copy=False)
# compute loading
data.loading = data.flows / (data.rates + 1e-20)
return data
[docs]
def add_contingency_flow(self, t: int, m: int, c: int,
flow_var: Union[float, LpVar, LpExp],
neg_slack: LpVar,
pos_slack: LpVar):
"""
Add contingency flow
:param t: time index
:param m: monitored index
:param c: contingency group index
:param flow_var: flow var
:param neg_slack: negative flow slack variable
:param pos_slack: positive flow slack variable
"""
self.contingency_flow_data.append((t, m, c, flow_var, neg_slack, pos_slack))
[docs]
class HvdcVars:
"""
Struct to store the generation vars
"""
def __init__(self, nt: int, n_elm: int):
"""
GenerationVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.flows = np.zeros((nt, n_elm), dtype=object)
self.rates = np.zeros((nt, n_elm), dtype=float)
self.loading = np.zeros((nt, n_elm), dtype=float)
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "HvdcVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: HvdcVars
"""
nt, n_elm = self.flows.shape
data = HvdcVars(nt=nt, n_elm=n_elm)
data.rates = self.rates
for t in range(nt):
for i in range(n_elm):
data.flows[t, i] = model.get_value(self.flows[t, i]) * Sbase
# format the arrays appropriately
data.flows = data.flows.astype(float, copy=False)
data.loading = data.flows / (data.rates + 1e-20)
return data
[docs]
class VscVars:
"""
Struct to store the generation vars
"""
def __init__(self, nt: int, n_elm: int):
"""
GenerationVars structure
:param nt: Number of time steps
:param n_elm: Number of branches
"""
self.flows = np.zeros((nt, n_elm), dtype=object)
self.rates = np.zeros((nt, n_elm), dtype=float)
self.loading = np.zeros((nt, n_elm), dtype=float)
[docs]
def get_values(self, Sbase: float, model: LpModel) -> "HvdcVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: HvdcVars
"""
nt, n_elm = self.flows.shape
data = HvdcVars(nt=nt, n_elm=n_elm)
data.rates = self.rates
for t in range(nt):
for i in range(n_elm):
data.flows[t, i] = model.get_value(self.flows[t, i]) * Sbase
# format the arrays appropriately
data.flows = data.flows.astype(float, copy=False)
data.loading = data.flows / (data.rates + 1e-20)
return data
[docs]
class FluidNodeVars:
"""
Struct to store the vars of nodes of fluid type
"""
def __init__(self, nt: int, n_elm: int):
"""
FluidNodeVars structure
:param nt: Number of time steps
:param n_elm: Number of nodes
"""
# the objects below are extracted from data
# self.min_level = np.zeros((nt, n_elm), dtype=float) # m3
# self.max_level = np.zeros((nt, n_elm), dtype=float) # m3
# self.initial_level = np.zeros((nt, n_elm), dtype=float) # m3
self.p2x_flow = np.zeros((nt, n_elm), dtype=object) # m3
self.current_level = np.zeros((nt, n_elm), dtype=object) # m3
self.spillage = np.zeros((nt, n_elm), dtype=object) # m3/s
self.flow_in = np.zeros((nt, n_elm), dtype=object) # m3/s
self.flow_out = np.zeros((nt, n_elm), dtype=object) # m3/s
[docs]
def get_values(self, model: LpModel) -> "FluidNodeVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:param model: LP model from where we extract the values
:return: FluidNodeVars
"""
nt, n_elm = self.p2x_flow.shape
data = FluidNodeVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.p2x_flow[t, i] = model.get_value(self.p2x_flow[t, i])
data.current_level[t, i] = model.get_value(self.current_level[t, i])
data.spillage[t, i] = model.get_value(self.spillage[t, i])
data.flow_in[t, i] = model.get_value(self.flow_in[t, i])
data.flow_out[t, i] = model.get_value(self.flow_out[t, i])
# format the arrays appropriately
data.p2x_flow = data.p2x_flow.astype(float, copy=False)
data.current_level = data.current_level.astype(float, copy=False)
data.spillage = data.spillage.astype(float, copy=False)
data.flow_in = data.flow_in.astype(float, copy=False)
data.flow_out = data.flow_out.astype(float, copy=False)
# from the data object itself
# data.min_level = self.min_level
# data.max_level = self.max_level
# data.initial_level = self.initial_level
return data
[docs]
class FluidPathVars:
"""
Struct to store the vars of paths of fluid type
"""
def __init__(self, nt: int, n_elm: int):
"""
FluidPathVars structure
:param nt: Number of time steps
:param n_elm: Number of paths (rivers)
"""
# from the data object
# self.min_flow = np.zeros((nt, n_elm), dtype=float) # m3/s
# self.max_flow = np.zeros((nt, n_elm), dtype=float) # m3/s
self.flow = np.zeros((nt, n_elm), dtype=object) # m3/s
[docs]
def get_values(self, model: LpModel) -> "FluidPathVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:param model: LP model from where we extract the values
:return: FluidPathVars
"""
nt, n_elm = self.flow.shape
data = FluidPathVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.flow[t, i] = model.get_value(self.flow[t, i])
# format the arrays appropriately
data.flow = data.flow.astype(float, copy=False)
# data.min_flow = self.min_flow
# data.max_flow = self.max_flow
return data
[docs]
class FluidInjectionVars:
"""
Struct to store the vars of injections of fluid type
"""
def __init__(self, nt: int, n_elm: int):
"""
FluidInjectionVars structure
:param nt: Number of time steps
:param n_elm: Number of elements moving fluid
"""
self.flow = np.zeros((nt, n_elm), dtype=object) # m3/s
[docs]
def get_values(self, model: LpModel) -> "FluidInjectionVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:param model: LP model from where we extract the values
:return: FluidInjectionVars
"""
nt, n_elm = self.flow.shape
data = FluidInjectionVars(nt=nt, n_elm=n_elm)
for t in range(nt):
for i in range(n_elm):
data.flow[t, i] = model.get_value(self.flow[t, i])
# format the arrays appropriately
data.flow = data.flow.astype(float, copy=False)
return data
[docs]
class SystemVars:
"""
Struct to store the system vars
"""
def __init__(self, nt: int):
"""
SystemVars structure
:param nt: Number of time steps
"""
self.system_fuel = np.zeros(nt, dtype=float)
self.system_emissions = np.zeros(nt, dtype=float)
self.system_unit_energy_cost = np.zeros(nt, dtype=float)
self.system_total_energy_cost = np.zeros(nt, dtype=float)
self.power_by_technology = np.zeros(nt, dtype=float)
[docs]
def compute(self,
gen_emissions_rates_matrix: csc_matrix,
gen_fuel_rates_matrix: csc_matrix,
gen_tech_shares_matrix: csc_matrix,
batt_tech_shares_matrix: csc_matrix,
gen_p: Mat,
gen_cost: Mat,
batt_p: Mat,
shedding_cost: Mat):
"""
Compute the system values
:param gen_emissions_rates_matrix: emissions rates matrix (n_emissions, n_gen)
:param gen_fuel_rates_matrix: fuel rates matrix (n_fuels, n_gen)
:param gen_tech_shares_matrix: technology shares of the generators
:param batt_tech_shares_matrix technology shares of the batteries
:param gen_p: Generation power values (nt, ngen)
:param gen_cost: Generation cost values (nt, ngen)
:param batt_p: Battery power values (nt, nbatt)
:param shedding_cost: Shedding cost values (nt, ngen)
"""
self.system_fuel = (gen_fuel_rates_matrix * gen_p.T).T
self.system_emissions = (gen_emissions_rates_matrix * gen_p.T).T
self.power_by_technology = (gen_tech_shares_matrix * gen_p.T).T
self.power_by_technology += (batt_tech_shares_matrix * batt_p.T).T
with np.errstate(divide='ignore', invalid='ignore'): # numpy magic to ignore the zero divisions
self.system_total_energy_cost = np.nan_to_num(gen_cost).sum(axis=1)
self.system_total_energy_cost += np.nan_to_num(shedding_cost).sum(axis=1)
self.system_unit_energy_cost = self.system_total_energy_cost / np.nan_to_num(gen_p).sum(axis=1)
return self
[docs]
class OpfVars:
"""
Structure to host the opf variables
"""
def __init__(self, nt: int, nbus: int, ng: int, nb: int, nl: int, nbr: int, n_hvdc: int, n_vsc: int,
n_fluid_node: int,
n_fluid_path: int, n_fluid_inj: int, n_cap_buses: int):
"""
Constructor
:param nt: number of time steps
:param nbus: number of nodes
:param ng: number of generators
:param nb: number of batteries
:param nl: number of loads
:param nbr: number of branches
:param n_hvdc: number of HVDC
:param n_fluid_node: number of fluid nodes
:param n_fluid_path: number of fluid paths
:param n_fluid_inj: number of fluid injections
"""
self.nt = nt
self.nbus = nbus
self.ng = ng
self.nb = nb
self.nl = nl
self.nbr = nbr
self.n_hvdc = n_hvdc
self.n_vsc = n_vsc
self.n_fluid_node = n_fluid_node
self.n_fluid_path = n_fluid_path
self.n_fluid_inj = n_fluid_inj
self.n_cap_buses = n_cap_buses
self.acceptable_solution = False
self.bus_vars = BusVars(nt=nt, n_elm=nbus)
self.nodal_capacity_vars = NodalCapacityVars(nt=nt, n_elm=n_cap_buses)
self.load_vars = LoadVars(nt=nt, n_elm=nl)
self.gen_vars = GenerationVars(nt=nt, n_elm=ng)
self.batt_vars = BatteryVars(nt=nt, n_elm=nb)
self.branch_vars = BranchVars(nt=nt, n_elm=nbr)
self.hvdc_vars = HvdcVars(nt=nt, n_elm=n_hvdc)
self.vsc_vars = VscVars(nt=nt, n_elm=n_vsc)
self.fluid_node_vars = FluidNodeVars(nt=nt, n_elm=n_fluid_node)
self.fluid_path_vars = FluidPathVars(nt=nt, n_elm=n_fluid_path)
self.fluid_inject_vars = FluidInjectionVars(nt=nt, n_elm=n_fluid_inj)
self.sys_vars = SystemVars(nt=nt)
[docs]
def get_values(self, Sbase: float, model: LpModel,
gen_emissions_rates_matrix: csc_matrix,
gen_fuel_rates_matrix: csc_matrix,
gen_tech_shares_matrix: csc_matrix,
batt_tech_shares_matrix: csc_matrix) -> "OpfVars":
"""
Return an instance of this class where the arrays content are not LP vars but their value
:return: OpfVars instance
"""
data = OpfVars(nt=self.nt,
nbus=self.nbus,
ng=self.ng,
nb=self.nb,
nl=self.nl,
nbr=self.nbr,
n_hvdc=self.n_hvdc,
n_vsc=self.n_vsc,
n_fluid_node=self.n_fluid_node,
n_fluid_path=self.n_fluid_path,
n_fluid_inj=self.n_fluid_inj,
n_cap_buses=self.n_cap_buses)
data.bus_vars = self.bus_vars.get_values(Sbase, model)
data.nodal_capacity_vars = self.nodal_capacity_vars.get_values(Sbase, model)
data.load_vars = self.load_vars.get_values(Sbase, model)
data.gen_vars = self.gen_vars.get_values(Sbase=Sbase,
model=model,
gen_emissions_rates_matrix=gen_emissions_rates_matrix,
gen_fuel_rates_matrix=gen_fuel_rates_matrix)
data.batt_vars = self.batt_vars.get_values(Sbase, model)
data.branch_vars = self.branch_vars.get_values(Sbase, model)
data.hvdc_vars = self.hvdc_vars.get_values(Sbase, model)
data.vsc_vars = self.vsc_vars.get_values(Sbase, model)
data.fluid_node_vars = self.fluid_node_vars.get_values(model)
data.fluid_path_vars = self.fluid_path_vars.get_values(model)
data.fluid_inject_vars = self.fluid_inject_vars.get_values(model)
data.sys_vars = self.sys_vars.compute(gen_emissions_rates_matrix=gen_emissions_rates_matrix,
gen_fuel_rates_matrix=gen_fuel_rates_matrix,
gen_tech_shares_matrix=gen_tech_shares_matrix,
batt_tech_shares_matrix=batt_tech_shares_matrix,
gen_p=data.gen_vars.p,
batt_p=data.batt_vars.p,
gen_cost=data.gen_vars.cost,
shedding_cost=data.load_vars.shedding_cost)
data.acceptable_solution = self.acceptable_solution
return data
[docs]
def add_linear_simple_generation_formulation(local_t: Union[int, None],
Sbase: float,
time_array: DateVec,
bus_vars: BusVars,
gen_data_t: GeneratorData,
gen_vars: GenerationVars,
prob: LpModel,
ramp_constraints: bool,
consider_time_up_down: bool,
area_spinning_reserve: bool,
skip_generation_limits: bool,
use_glsk_as_cost: bool,
logger: Logger) -> LpExp | float:
"""
Add MIP generation formulation
:param local_t: time step
:param Sbase: base power (100 MVA)
:param time_array: complete time array
:param bus_vars: BusVars
:param gen_data_t: GeneratorData structure
:param gen_vars: GenerationVars structure
:param prob: LpModel
:param ramp_constraints: formulate ramp constraints?
:param consider_time_up_down: consider time up/down?
:param area_spinning_reserve: area spinning reserve?
:param skip_generation_limits: skip the generation limits?
:param use_glsk_as_cost: if true, the GLSK values are used instead of the traditional costs
:param logger: Logger object
:return objective function
"""
f_obj = 0.0
# add generation stuff
for k in range(gen_data_t.nelm):
gen_vars.cost[local_t, k] = 0.0
bus_idx = gen_data_t.bus_idx[k]
# nodal_cap_condition = gen_data_t.bus_idx[k] not in vd if nodal_capacity_active else True
if gen_data_t.active[k] and bus_idx > -1:
if gen_data_t.dispatchable[k]:
# declare active power var (limits will be applied later)
gen_vars.p[local_t, k] = prob.add_var(-1e20, 1e20, f"gen_p_{local_t}_{k}")
# NOTE: Must run is the same as this, so we skip it
if gen_data_t.must_run[k]:
logger.add_warning(
msg="Must run has no further effects since unit commitment is deactivated",
device_class="Generator",
device=gen_data_t.names[k]
)
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += ((gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) +
gen_data_t.cost_0[k])
if not skip_generation_limits:
prob.set_var_bounds(var=gen_vars.p[local_t, k],
lb=gen_data_t.pmin[k] / Sbase,
ub=gen_data_t.pmax[k] / Sbase)
# add the ramp constraints
if ramp_constraints and local_t > 0 and not skip_generation_limits:
# if the ramp is actually sufficiently restrictive...
dt = (time_array[local_t] - time_array[local_t - 1]).seconds / 3600.0 # time increment in hours
if gen_data_t.ramp_up[k] <= gen_data_t.pmax[k]:
# P(t) - P(t-1) <= ramp_up Β· dt / Sbase
prob.add_cst(
gen_vars.p[local_t, k] - gen_vars.p[local_t - 1, k] <= gen_data_t.ramp_up[k] / Sbase * dt,
name=f"ramp_up_{local_t}_{k}"
)
else:
logger.add_warning("Generator ramp up greater then Pmax",
value=gen_data_t.ramp_up[k],
device=f"{k}: {gen_data_t.names[k]}",
expected_value=gen_data_t.pmax[k])
if gen_data_t.ramp_down[k] <= - gen_data_t.pmax[k]:
# P(t-1) - P(t) <= ramp_down Β· dt / Sbase
# NOTE: Ramp down must be negative
prob.add_cst(
gen_vars.p[local_t - 1, k] - gen_vars.p[local_t, k] <= gen_data_t.ramp_down[k] / Sbase * dt,
name=f"ramp_dwn_{local_t}_{k}"
)
else:
logger.add_error("Generator ramp down lower than -Pmax",
value=gen_data_t.ramp_down[k],
device=f"{k}: {gen_data_t.names[k]}",
expected_value=-gen_data_t.pmax[k])
else:
# NOTE: it is NOT dispatchable
p = gen_data_t.p[k] / Sbase
# the generator is not dispatchable at time step
if p > 0:
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p - gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
elif p < 0:
# the negative sign is because P is already negative here, to make it positive
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=-p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p + gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
else:
# the generation value is exactly zero
prob.set_var_bounds(var=gen_vars.p[local_t, k], lb=0.0, ub=0.0)
gen_vars.producing[local_t, k] = 1
gen_vars.shutting_down[local_t, k] = 0
gen_vars.starting_up[local_t, k] = 0
if gen_data_t.must_run[k]:
logger.add_warning("Ignoring must run, because it is not dispatchable",
device_class="Generator",
device=gen_data_t.names[k])
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += (gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) + gen_data_t.cost_0[k]
# add to the balance
bus_vars.Pbalance[local_t, bus_idx] += gen_vars.p[local_t, k]
# add to the generation injections in case of inter-area
bus_vars.Pgen[local_t, bus_idx] += gen_vars.p[local_t, k]
else:
# the generator is not available at time step
gen_vars.p[local_t, k] = 0.0 # there has not been any variable assigned to p[t, k] at this point
# add to the objective function the total cost of the generator
f_obj += gen_vars.cost[local_t, k]
return f_obj
[docs]
def add_linear_nodal_capacity_generation_formulation(local_t: Union[int, None],
Sbase: float,
time_array: DateVec,
bus_vars: BusVars,
gen_data_t: GeneratorData,
gen_vars: GenerationVars,
prob: LpModel,
skip_generation_limits: bool,
vd: IntVec,
nodal_capacity_active: bool,
use_glsk_as_cost: bool,
logger: Logger) -> LpExp | float:
"""
Add MIP generation formulation
:param local_t: time step
:param Sbase: base power (100 MVA)
:param time_array: complete time array
:param bus_vars: BusVars
:param gen_data_t: GeneratorData structure
:param gen_vars: GenerationVars structure
:param prob: LpModel
:param skip_generation_limits: skip the generation limits?
:param vd: slack indices
:param nodal_capacity_active: nodal capacity active?
:param use_glsk_as_cost: if true, the GLSK values are used instead of the traditional costs
:param logger: Logger object
:return objective function
"""
f_obj = 0.0
if nodal_capacity_active:
# TODO: This looks like a bug
# id_gen_nonvd = [i for i in range(gen_data_t.C_bus_elm.shape[1]) if i not in vd]
id_gen_nonvd = [i for i in range(gen_data_t.nelm) if i not in vd]
else:
id_gen_nonvd = []
if time_array is not None:
if len(time_array) > 0:
year = time_array[local_t].year - time_array[0].year
else:
year = 0
else:
year = 0
# add generation stuff
for k in range(gen_data_t.nelm):
gen_vars.cost[local_t, k] = 0.0
bus_idx = gen_data_t.bus_idx[k]
# nodal_cap_condition = gen_data_t.bus_idx[k] not in vd if nodal_capacity_active else True
if gen_data_t.active[k] and k not in id_gen_nonvd and bus_idx > -1: # TODO Review and change this stuff
if gen_data_t.dispatchable[k]:
# declare active power var (limits will be applied later)
gen_vars.p[local_t, k] = prob.add_var(-1e20, 1e20, f"gen_p_{local_t}_{k}")
# NOTE: Must run is the same as this, so we skip it
if gen_data_t.must_run[k]:
logger.add_warning(
msg="Must run has no further effects since unit commitment is deactivated",
device_class="Generator",
device=gen_data_t.names[k]
)
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += ((gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) +
gen_data_t.cost_0[k])
if not skip_generation_limits:
prob.set_var_bounds(var=gen_vars.p[local_t, k],
lb=gen_data_t.pmin[k] / Sbase,
ub=gen_data_t.pmax[k] / Sbase)
else:
# NOTE: it is NOT dispatchable
p = gen_data_t.p[k] / Sbase
# the generator is not dispatchable at time step
if p > 0:
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p - gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
elif p < 0:
# the negative sign is because P is already negative here, to make it positive
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=-p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p + gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
else:
# the generation value is exactly zero
prob.set_var_bounds(var=gen_vars.p[local_t, k], lb=0.0, ub=0.0)
gen_vars.producing[local_t, k] = 1
gen_vars.shutting_down[local_t, k] = 0
gen_vars.starting_up[local_t, k] = 0
if gen_data_t.must_run[k]:
logger.add_warning("Ignoring must run, because it is not dispatchable",
device_class="Generator",
device=gen_data_t.names[k])
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += (gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) + gen_data_t.cost_0[k]
# add to the balance
bus_vars.Pbalance[local_t, bus_idx] += gen_vars.p[local_t, k]
# add to the generation injections in case of inter-area
bus_vars.Pgen[local_t, bus_idx] += gen_vars.p[local_t, k]
else:
# the generator is not available at time step
gen_vars.p[local_t, k] = 0.0 # there has not been any variable assigned to p[t, k] at this point
# add to the objective function the total cost of the generator
f_obj += gen_vars.cost[local_t, k]
return f_obj
[docs]
def add_linear_generation_expansion_planning_formulation(local_t: Union[int, None],
Sbase: float,
time_array: DateVec,
bus_vars: BusVars,
gen_data_t: GeneratorData,
gen_vars: GenerationVars,
prob: LpModel,
ramp_constraints: bool,
skip_generation_limits: bool,
use_glsk_as_cost: bool,
logger: Logger) -> LpExp | float:
"""
Add MIP generation formulation
:param local_t: time step
:param Sbase: base power (100 MVA)
:param time_array: complete time array
:param bus_vars: BusVars
:param gen_data_t: GeneratorData structure
:param gen_vars: GenerationVars structure
:param prob: LpModel
:param ramp_constraints: formulate ramp constraints?
:param skip_generation_limits: skip the generation limits?
:param generation_expansion_planning: generation expansion plan?
:param use_glsk_as_cost: if true, the GLSK values are used instead of the traditional costs
:param logger: Logger object
:return objective function
"""
f_obj = 0.0
if time_array is not None:
if len(time_array) > 0:
year = time_array[local_t].year - time_array[0].year
else:
year = 0
else:
year = 0
# add generation stuff
for k in range(gen_data_t.nelm):
gen_vars.cost[local_t, k] = 0.0
bus_idx = gen_data_t.bus_idx[k]
# nodal_cap_condition = gen_data_t.bus_idx[k] not in vd if nodal_capacity_active else True
if gen_data_t.active[k] and bus_idx > -1:
if gen_data_t.dispatchable[k]:
# declare active power var (limits will be applied later)
gen_vars.p[local_t, k] = prob.add_var(-1e20, 1e20, f"gen_p_{local_t}_{k}")
# NOTE: Must run is the same as this, so we skip it
if gen_data_t.must_run[k]:
logger.add_warning(
msg="Must run has no further effects since unit commitment is deactivated",
device_class="Generator",
device=gen_data_t.names[k]
)
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += ((gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) +
gen_data_t.cost_0[k])
if not skip_generation_limits:
prob.set_var_bounds(var=gen_vars.p[local_t, k],
lb=gen_data_t.pmin[k] / Sbase,
ub=gen_data_t.pmax[k] / Sbase)
# add the ramp constraints
if ramp_constraints and local_t > 0 and not skip_generation_limits:
# if the ramp is actually sufficiently restrictive...
dt = (time_array[local_t] - time_array[local_t - 1]).seconds / 3600.0 # time increment in hours
if gen_data_t.ramp_up[k] <= gen_data_t.pmax[k]:
# P(t) - P(t-1) <= ramp_up Β· dt / Sbase
prob.add_cst(
gen_vars.p[local_t, k] - gen_vars.p[local_t - 1, k] <= gen_data_t.ramp_up[k] / Sbase * dt,
name=f"ramp_up_{local_t}_{k}"
)
else:
logger.add_warning("Generator ramp up greater then Pmax",
value=gen_data_t.ramp_up[k],
device=f"{k}: {gen_data_t.names[k]}",
expected_value=gen_data_t.pmax[k])
if gen_data_t.ramp_down[k] <= - gen_data_t.pmax[k]:
# P(t-1) - P(t) <= ramp_down Β· dt / Sbase
# NOTE: Ramp down must be negative
prob.add_cst(
gen_vars.p[local_t - 1, k] - gen_vars.p[local_t, k] <= gen_data_t.ramp_down[k] / Sbase * dt,
name=f"ramp_dwn_{local_t}_{k}"
)
else:
logger.add_error("Generator ramp down lower than -Pmax",
value=gen_data_t.ramp_down[k],
device=f"{k}: {gen_data_t.names[k]}",
expected_value=-gen_data_t.pmax[k])
# Generation Expansion Planning
if gen_data_t.is_candidate[k]:
money_factor = np.power(1.0 + gen_data_t.discount_rate[k] / 100.0, year)
# declare the investment binary
gen_vars.invested[local_t, k] = prob.add_int(lb=0, ub=1, name=f"Ig_{local_t}_{k}")
# add the investment cost to the objective
f_obj += gen_vars.invested[local_t, k] * (gen_data_t.pmax[k] / Sbase) * gen_data_t.capex[
k] * money_factor
if local_t > 0:
# installation persistence
prob.add_cst(
gen_vars.invested[local_t - 1, k] <= gen_vars.invested[local_t, k],
name=f"persist_{local_t}_{k}"
)
# maximum production constraint
prob.add_cst(gen_vars.p[local_t, k] <= (gen_data_t.pmax[k] / Sbase) * gen_vars.invested[local_t, k],
name=f"max_prod_{local_t}_{k}")
else:
# is invested for already
gen_vars.invested[local_t, k] = 1
else:
# NOTE: it is NOT dispatchable
p = gen_data_t.p[k] / Sbase
# the generator is not dispatchable at time step
if p > 0:
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p - gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
elif p < 0:
# the negative sign is because P is already negative here, to make it positive
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=-p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p + gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
else:
# the generation value is exactly zero
prob.set_var_bounds(var=gen_vars.p[local_t, k], lb=0.0, ub=0.0)
gen_vars.producing[local_t, k] = 1
gen_vars.shutting_down[local_t, k] = 0
gen_vars.starting_up[local_t, k] = 0
if gen_data_t.must_run[k]:
logger.add_warning("Ignoring must run, because it is not dispatchable",
device_class="Generator",
device=gen_data_t.names[k])
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += (gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) + gen_data_t.cost_0[k]
# add to the balance
bus_vars.Pbalance[local_t, bus_idx] += gen_vars.p[local_t, k]
# add to the generation injections in case of inter-area
bus_vars.Pgen[local_t, bus_idx] += gen_vars.p[local_t, k]
else:
# the generator is not available at time step
gen_vars.p[local_t, k] = 0.0 # there has not been any variable assigned to p[t, k] at this point
# add to the objective function the total cost of the generator
f_obj += gen_vars.cost[local_t, k]
return f_obj
[docs]
def add_linear_generation_unit_commitment_formulation(
local_t: Union[int, None],
Sbase: float,
time_array: DateVec,
bus_vars: BusVars,
gen_data_t: GeneratorData,
gen_vars: GenerationVars,
prob: LpModel,
ramp_constraints: bool,
consider_time_up_down: bool,
area_spinning_reserve: bool,
skip_generation_limits: bool,
use_glsk_as_cost: bool,
logger: Logger) -> LpExp | float:
"""
Add MIP generation formulation
:param local_t: time step
:param Sbase: base power (100 MVA)
:param time_array: complete time array
:param bus_vars: BusVars
:param gen_data_t: GeneratorData structure
:param gen_vars: GenerationVars structure
:param prob: LpModel
:param ramp_constraints: formulate ramp constraints?
:param consider_time_up_down: consider time up/down?
:param area_spinning_reserve: area spinning reserve?
:param skip_generation_limits: skip the generation limits?
:param use_glsk_as_cost: if true, the GLSK values are used instead of the traditional costs
:param logger: Logger object
:return objective function
"""
f_obj = 0.0
# add generation stuff
for k in range(gen_data_t.nelm):
gen_vars.cost[local_t, k] = 0.0
bus_idx = gen_data_t.bus_idx[k]
# nodal_cap_condition = gen_data_t.bus_idx[k] not in vd if nodal_capacity_active else True
if gen_data_t.active[k] and bus_idx > -1:
if gen_data_t.dispatchable[k]:
# declare active power var (limits will be applied later)
gen_vars.p[local_t, k] = prob.add_var(-1e20, 1e20, f"gen_p_{local_t}_{k}")
if gen_data_t.must_run[k]:
# NOTE: Unit commitment is incompatible with "must run"
logger.add_warning("Unit commitment overridden by must run",
device_class="Generator",
device=gen_data_t.names[k])
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += (gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) + \
gen_data_t.cost_0[k]
if not skip_generation_limits:
prob.set_var_bounds(var=gen_vars.p[local_t, k],
lb=gen_data_t.pmin[k] / Sbase,
ub=gen_data_t.pmax[k] / Sbase)
else:
# declare unit commitment vars
gen_vars.starting_up[local_t, k] = prob.add_bin(f"gen_starting_up_{local_t}_{k}")
gen_vars.producing[local_t, k] = prob.add_bin(f"gen_producing_{local_t}_{k}")
gen_vars.shutting_down[local_t, k] = prob.add_bin(f"gen_shutting_down_{local_t}_{k}")
# operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += (gen_data_t.cost_1[k] * gen_vars.p[local_t, k]
+ gen_data_t.cost_0[k] * gen_vars.producing[local_t, k])
# start-up cost
if gen_data_t.startup_cost[k] != 0.0:
gen_vars.cost[local_t, k] += gen_data_t.startup_cost[k] * gen_vars.starting_up[local_t, k]
# shut-down cost
if gen_data_t.shut_down_cost[k] != 0.0:
gen_vars.cost[local_t, k] += gen_data_t.shut_down_cost[k] * gen_vars.shutting_down[
local_t, k]
# power boundaries of the generator
if not skip_generation_limits:
prob.add_cst(
cst=gen_vars.p[local_t, k] >= (
gen_data_t.pmin[k] / Sbase * gen_vars.producing[local_t, k]),
name=f"gen_geq_Pmin_{local_t}_{k}"
)
prob.add_cst(
cst=gen_vars.p[local_t, k] <= (
gen_data_t.pmax[k] / Sbase * gen_vars.producing[local_t, k]),
name=f"gen_leq_Pmax_{local_t}_{k}"
)
if local_t is not None:
if local_t == 0:
# NOTE: Here, gen_data_t.active[k] represents the producing state at t-1
prob.add_cst(
cst=gen_vars.starting_up[local_t, k] - gen_vars.shutting_down[local_t, k] ==
gen_vars.producing[local_t, k] - float(gen_data_t.active[k]),
name=f"binary_alg1_{local_t}_{k}"
)
prob.add_cst(
cst=gen_vars.starting_up[local_t, k] + gen_vars.shutting_down[local_t, k] <= 1,
name=f"binary_alg2_{local_t}_{k}"
)
else:
prob.add_cst(
cst=(gen_vars.starting_up[local_t, k] - gen_vars.shutting_down[local_t, k] ==
gen_vars.producing[local_t, k] - gen_vars.producing[local_t - 1, k]),
name=f"binary_alg3_{local_t}_{k}"
)
prob.add_cst(
cst=gen_vars.starting_up[local_t, k] + gen_vars.shutting_down[local_t, k] <= 1,
name=f"binary_alg4_{local_t}_{k}"
)
# add the ramp constraints
if ramp_constraints and local_t > 0 and not skip_generation_limits:
# if the ramp is actually sufficiently restrictive...
dt = (time_array[local_t] - time_array[local_t - 1]).seconds / 3600.0 # time increment in hours
if gen_data_t.ramp_up[k] <= gen_data_t.pmax[k]:
# P(t) - P(t-1) <= ramp_up Β· dt / Sbase
prob.add_cst(
gen_vars.p[local_t, k] - gen_vars.p[local_t - 1, k] <= gen_data_t.ramp_up[k] / Sbase * dt,
name=f"ramp_up_{local_t}_{k}"
)
else:
logger.add_warning("Generator ramp up greater then Pmax",
value=gen_data_t.ramp_up[k],
device=f"{k}: {gen_data_t.names[k]}",
expected_value=gen_data_t.pmax[k])
if gen_data_t.ramp_down[k] <= - gen_data_t.pmax[k]:
# P(t-1) - P(t) <= ramp_down Β· dt / Sbase
# NOTE: Ramp down must be negative
prob.add_cst(
gen_vars.p[local_t - 1, k] - gen_vars.p[local_t, k] <= gen_data_t.ramp_down[k] / Sbase * dt,
name=f"ramp_dwn_{local_t}_{k}"
)
else:
logger.add_error("Generator ramp down lower than -Pmax",
value=gen_data_t.ramp_down[k],
device=f"{k}: {gen_data_t.names[k]}",
expected_value=-gen_data_t.pmax[k])
else:
# NOTE: it is NOT dispatchable
p = gen_data_t.p[k] / Sbase
# the generator is not dispatchable at time step
if p > 0:
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p - gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
elif p < 0:
# the negative sign is because P is already negative here, to make it positive
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=-p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p + gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
else:
# the generation value is exactly zero
prob.set_var_bounds(var=gen_vars.p[local_t, k], lb=0.0, ub=0.0)
gen_vars.producing[local_t, k] = 1
gen_vars.shutting_down[local_t, k] = 0
gen_vars.starting_up[local_t, k] = 0
if gen_data_t.must_run[k]:
logger.add_warning("Ignoring must run, because it is not dispatchable",
device_class="Generator",
device=gen_data_t.names[k])
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += (gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) + gen_data_t.cost_0[k]
# add to the balance
bus_vars.Pbalance[local_t, bus_idx] += gen_vars.p[local_t, k]
# add to the generation injections in case of inter-area
bus_vars.Pgen[local_t, bus_idx] += gen_vars.p[local_t, k]
else:
# the generator is not available at time step
gen_vars.p[local_t, k] = 0.0 # there has not been any variable assigned to p[t, k] at this point
# add to the objective function the total cost of the generator
f_obj += gen_vars.cost[local_t, k]
return f_obj
[docs]
def add_linear_generation_redispatch_formulation(local_t: Union[int, None],
Sbase: float,
bus_vars: BusVars,
gen_data_t: GeneratorData,
gen_vars: GenerationVars,
prob: LpModel,
inter_aggregation_info: InterAggregationInfo,
skip_generation_limits: bool,
use_glsk_as_cost: bool,
logger: Logger) -> LpExp | float:
"""
Add MIP generation redispatch formulation
the main difference is that Pg = P + dP
dP >= 0 for generators in A1 (sending),
dP <= 0 for generators in A2 (receiving),
:param local_t: time step
:param Sbase: base power (100 MVA)
:param bus_vars: BusVars
:param gen_data_t: GeneratorData structure
:param gen_vars: GenerationVars structure
:param prob: LpModel
:param inter_aggregation_info:
:param skip_generation_limits: skip the generation limits?
:param use_glsk_as_cost: if true, the GLSK values are used instead of the traditional costs
:param logger: Logger object
:return objective function
"""
f_obj = 0.0
# add generation stuff
for k in range(gen_data_t.nelm):
gen_vars.cost[local_t, k] = 0.0
bus_idx = gen_data_t.bus_idx[k]
if gen_data_t.active[k] and bus_idx > -1:
if gen_data_t.dispatchable[k]:
p_fix = gen_data_t.p[k] / Sbase
if inter_aggregation_info.is_from(bus_idx):
# declare active power var (limits will be applied later)
gen_vars.p[local_t, k] = prob.add_var(-1e20, 1e20, f"gen_p_{local_t}_{k}")
gen_vars.dp[local_t, k] = prob.add_var(0, 1e20, f"gen_dp_{local_t}_{k}")
prob.add_cst(
cst=gen_vars.p[local_t, k] == p_fix + gen_vars.dp[local_t, k], # dp must be positive
name=f"gen_p_up_{local_t}_{k}"
)
# NOTE: Must run is the same as this, so we skip it
if gen_data_t.must_run[k]:
logger.add_warning(
msg="Must run has no further effects since unit commitment is deactivated",
device_class="Generator",
device=gen_data_t.names[k]
)
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += ((gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) +
gen_data_t.cost_0[k])
if not skip_generation_limits:
prob.set_var_bounds(var=gen_vars.p[local_t, k],
lb=gen_data_t.pmin[k] / Sbase,
ub=gen_data_t.pmax[k] / Sbase)
elif inter_aggregation_info.is_to(bus_idx):
# declare active power var (limits will be applied later)
gen_vars.p[local_t, k] = prob.add_var(-1e20, 1e20, f"gen_p_{local_t}_{k}")
gen_vars.dp[local_t, k] = prob.add_var(-1e20, 0, f"gen_dp_{local_t}_{k}")
prob.add_cst(
cst=gen_vars.p[local_t, k] == p_fix + gen_vars.dp[local_t, k], # dp must be negative
name=f"gen_p_down_{local_t}_{k}"
)
# NOTE: Must run is the same as this, so we skip it
if gen_data_t.must_run[k]:
logger.add_warning(
msg="Must run has no further effects since unit commitment is deactivated",
device_class="Generator",
device=gen_data_t.names[k]
)
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += ((gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) +
gen_data_t.cost_0[k])
if not skip_generation_limits:
prob.set_var_bounds(var=gen_vars.p[local_t, k],
lb=gen_data_t.pmin[k] / Sbase,
ub=gen_data_t.pmax[k] / Sbase)
else:
# same as not dispatchable, but we don't add shedding
gen_vars.p[local_t, k] = gen_data_t.p[k] / Sbase
# is invested for already
gen_vars.invested[local_t, k] = 1
else:
# NOTE: it is NOT dispatchable
p = gen_data_t.p[k] / Sbase
# the generator is not dispatchable at time step
if p > 0:
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p - gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
elif p < 0:
# the negative sign is because P is already negative here, to make it positive
gen_vars.shedding[local_t, k] = prob.add_var(
lb=0, ub=-p,
name=f"gen_shedding_{local_t}_{k}"
)
gen_vars.p[local_t, k] = p + gen_vars.shedding[local_t, k]
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.shedding[local_t, k]
else:
gen_vars.cost[local_t, k] += gen_data_t.cost_1[k] * gen_vars.shedding[local_t, k]
else:
# the generation value is exactly zero
prob.set_var_bounds(var=gen_vars.p[local_t, k], lb=0.0, ub=0.0)
gen_vars.producing[local_t, k] = 1
gen_vars.shutting_down[local_t, k] = 0
gen_vars.starting_up[local_t, k] = 0
if gen_data_t.must_run[k]:
logger.add_warning("Ignoring must run, because it is not dispatchable",
device_class="Generator",
device=gen_data_t.names[k])
# Operational cost (linear...)
if use_glsk_as_cost:
gen_vars.cost[local_t, k] += gen_data_t.shift_key[k] * gen_vars.p[local_t, k]
else:
gen_vars.cost[local_t, k] += (gen_data_t.cost_1[k] * gen_vars.p[local_t, k]) + gen_data_t.cost_0[k]
# add to the balance
bus_vars.Pbalance[local_t, bus_idx] += gen_vars.p[local_t, k]
# add to the generation injections in case of inter-area
bus_vars.Pgen[local_t, bus_idx] += gen_vars.p[local_t, k]
else:
# the generator is not available at time step
gen_vars.p[local_t, k] = 0.0 # there has not been any variable assigned to p[t, k] at this point
# add to the objective function the total cost of the generator
f_obj += gen_vars.cost[local_t, k]
return f_obj
[docs]
def add_linear_battery_formulation(t: Union[int, None],
Sbase: float,
time_array: DateVec,
bus_vars: BusVars,
batt_data_t: BatteryData,
batt_vars: BatteryVars,
prob: LpModel,
unit_commitment: bool,
ramp_constraints: bool,
skip_generation_limits: bool,
generation_expansion_planning: bool,
energy_0: Vec):
"""
Add MIP generation formulation
:param t: time step, if None we assume single time step
:param Sbase: base power (100 MVA)
:param time_array: complete time array
:param bus_vars: BusVars
:param batt_data_t: BatteryData structure
:param batt_vars: BatteryVars structure
:param prob: ORTools problem
:param unit_commitment: formulate unit commitment?
:param ramp_constraints: formulate ramp constraints?
:param skip_generation_limits: skip the generation limits?
:param generation_expansion_planning: generation expansion planning?
:param energy_0: initial value of the energy stored
:return objective function
"""
f_obj = 0.0
for k in range(batt_data_t.nelm):
bus_idx = batt_data_t.bus_idx[k]
if batt_data_t.active[k] and bus_idx > -1:
# declare active power var (limits will be applied later)
p_pos = prob.add_var(0, 1e20, join("batt_ppos_", [t, k], "_"))
p_neg = prob.add_var(0, 1e20, join("batt_pneg_", [t, k], "_"))
batt_vars.p[t, k] = p_pos - p_neg
if batt_data_t.dispatchable[k]:
if unit_commitment:
# declare unit commitment vars
batt_vars.starting_up[t, k] = prob.add_int(0, 1,
join("bat_starting_up_", [t, k], "_"))
batt_vars.producing[t, k] = prob.add_int(0, 1,
join("bat_producing_", [t, k], "_"))
batt_vars.shutting_down[t, k] = prob.add_int(0, 1,
join("bat_shutting_down_", [t, k], "_"))
# operational cost (linear...)
f_obj += (batt_data_t.cost_1[k] * p_pos
+ batt_data_t.cost_0[k] * batt_vars.producing[t, k])
# start-up cost
f_obj += batt_data_t.startup_cost[k] * batt_vars.starting_up[t, k]
# power boundaries of the generator
if not skip_generation_limits:
prob.add_cst(
cst=(batt_vars.p[t, k] >= (batt_data_t.pmin[k] / Sbase * batt_vars.producing[t, k])),
name=join("batt_geq_Pmin", [t, k], "_"))
prob.add_cst(
cst=(batt_vars.p[t, k] <= (batt_data_t.pmax[k] / Sbase * batt_vars.producing[t, k])),
name=join("batt_leq_Pmax", [t, k], "_"))
if t is not None:
if t == 0:
prob.add_cst(
cst=(batt_vars.starting_up[t, k] - batt_vars.shutting_down[t, k] ==
batt_vars.producing[t, k] - float(batt_data_t.active[k])),
name=join("binary_bat_alg1_", [t, k], "_"))
prob.add_cst(
cst=batt_vars.starting_up[t, k] + batt_vars.shutting_down[t, k] <= 1,
name=join("binary_bat_alg2_", [t, k], "_"))
else:
prob.add_cst(
cst=(batt_vars.starting_up[t, k] - batt_vars.shutting_down[t, k] ==
batt_vars.producing[t, k] - batt_vars.producing[t - 1, k]),
name=join("binary_bat_alg3_", [t, k], "_"))
prob.add_cst(
cst=batt_vars.starting_up[t, k] + batt_vars.shutting_down[t, k] <= 1,
name=join("binary_bat_alg4_", [t, k], "_"))
else:
# No unit commitment
# Operational cost (linear...)
f_obj += (batt_data_t.cost_1[k] * p_pos) + batt_data_t.cost_0[k]
# power boundaries of the generator
if not skip_generation_limits:
prob.set_var_bounds(var=p_pos, lb=0, ub=+batt_data_t.pmax[k] / Sbase)
prob.set_var_bounds(var=p_neg, lb=0, ub=-batt_data_t.pmin[k] / Sbase)
# compute the time increment in hours
if len(time_array) > 1:
dt = (time_array[t] - time_array[t - 1]).seconds / 3600.0
else:
dt = 1.0
if ramp_constraints and t is not None:
if t > 0:
# add the ramp constraints
if batt_data_t.ramp_up[k] < batt_data_t.pmax[k] and \
batt_data_t.ramp_down[k] < batt_data_t.pmax[k]:
# if the ramp is actually sufficiently restrictive...
# - ramp_down Β· dt <= P(t) - P(t-1) <= ramp_up Β· dt
prob.add_cst(
cst=-batt_data_t.ramp_down[k] / Sbase * dt <= batt_vars.p[t, k] - batt_vars.p[t - 1, k])
prob.add_cst(
cst=batt_vars.p[t, k] - batt_vars.p[t - 1, k] <= batt_data_t.ramp_up[k] / Sbase * dt)
# # # set the energy value Et = E(t - 1) + dt * Pb / eff
batt_vars.e[t, k] = prob.add_var(batt_data_t.e_min[k] / Sbase,
batt_data_t.e_max[k] / Sbase,
join("batt_e_", [t, k], "_"))
if t > 0:
# energy decreases / increases with power Β· dt
prob.add_cst(cst=(batt_vars.e[t, k] == batt_vars.e[t - 1, k]
+ dt * (batt_data_t.discharge_efficiency[k] * p_pos
- batt_data_t.charge_efficiency[k] * p_neg)),
name=join("batt_energy_", [t, k], "_"))
else:
# set the initial energy value
batt_vars.e[t, k] = energy_0[k] / Sbase
else:
# it is NOT dispatchable
# Operational cost (linear...)
f_obj += (batt_data_t.cost_1[k] * p_pos) + batt_data_t.cost_0[k]
p = batt_data_t.p[k] / Sbase
# the generator is not dispatchable at time step
if p > 0:
batt_vars.shedding[t, k] = prob.add_var(0, p, join("bat_shedding_", [t, k], "_"))
prob.add_cst(
cst=batt_vars.p[t, k] == batt_data_t.p[k] / Sbase - batt_vars.shedding[t, k],
name=join("batt==PB-PBslack", [t, k], "_"))
f_obj += batt_data_t.cost_1[k] * batt_vars.shedding[t, k]
elif p < 0:
# the negative sign is because P is already negative here
batt_vars.shedding[t, k] = prob.add_var(lb=0,
ub=-p,
name=join("bat_shedding_", [t, k], "_"))
prob.add_cst(
cst=batt_vars.p[t, k] == batt_data_t.p[k] / Sbase + batt_vars.shedding[t, k],
name=join("batt==PB+PBslack", [t, k], "_"))
f_obj += batt_data_t.cost_1[k] * batt_vars.shedding[t, k]
else:
# the generation value is exactly zero, pass
pass
batt_vars.producing[t, k] = 1
batt_vars.shutting_down[t, k] = 0
batt_vars.starting_up[t, k] = 0
# add to the balance
bus_vars.Pbalance[t, bus_idx] += batt_vars.p[t, k]
# add to the generation injections in case of inter-area
bus_vars.Pgen[t, bus_idx] += batt_vars.p[t, k]
else:
# the generator is not available at a time step
batt_vars.p[t, k] = 0.0
return f_obj
[docs]
def add_nodal_capacity_formulation(t: Union[int, None],
nodal_capacity_vars: NodalCapacityVars,
nodal_capacity_sign: float,
capacity_nodes_idx: IntVec,
prob: LpModel):
"""
Add MIP generation formulation
:param t: time step, if None we assume single time step
:param nodal_capacity_vars: NodalCapacityVars structure
:param nodal_capacity_sign:
:param capacity_nodes_idx: IntVec
:param prob: ORTools problem
:return objective function
"""
f_obj = 0.0
for k, idx in enumerate(capacity_nodes_idx):
# assign load shedding variable
if nodal_capacity_sign < 0:
nodal_capacity_vars.P[t, k] = prob.add_var(lb=0.0,
ub=9999.9,
name=join("nodal_capacity_", [t, k], "_"))
else:
nodal_capacity_vars.P[t, k] = prob.add_var(lb=-9999.9,
ub=0.0,
name=join("nodal_capacity_", [t, k], "_"))
# maximize the nodal power injection
f_obj += 100 * nodal_capacity_sign * nodal_capacity_vars.P[t, k]
return f_obj
[docs]
def add_linear_load_formulation(t: Union[int, None],
Sbase: float,
bus_vars: BusVars,
load_data_t: LoadData,
load_vars: LoadVars,
prob: LpModel):
"""
Add MIP generation formulation
:param t: time step, if None we assume single time step
:param Sbase: base power (100 MVA)
:param bus_vars: BusVars
:param load_data_t: BatteryData structure
:param load_vars: BatteryVars structure
:param prob: ORTools problem
:return objective function
"""
f_obj = 0.0
for k in range(load_data_t.nelm):
bus_idx = load_data_t.bus_idx[k]
if load_data_t.active[k] and bus_idx > -1:
p_set = load_data_t.S[k].real / Sbase
if p_set > 0.0:
# assign load shedding variable
load_vars.shedding[t, k] = prob.add_var(lb=0,
ub=p_set,
name=join("load_shedding_", [t, k], "_"))
# store the load
load_vars.p[t, k] = p_set - load_vars.shedding[t, k]
load_vars.shedding_cost[t, k] = load_data_t.cost[k] * load_vars.shedding[t, k]
# minimize the load shedding
f_obj += load_vars.shedding_cost[t, k]
else:
# the load is negative, won't shed?
load_vars.shedding[t, k] = 0.0
# store the load
load_vars.p[t, k] = load_data_t.S[k].real / Sbase
# add to the balance
bus_vars.Pbalance[t, bus_idx] += -load_vars.p[t, k]
# add to the injections
# bus_vars.Pinj[t, bus_idx] += - load_vars.p[t, k]
else:
# the load is not available at time step
load_vars.shedding[t, k] = 0.0
return f_obj
[docs]
def add_linear_branches_formulation(t: int,
Sbase: float,
bus_data_t: BusData,
branch_data_t: PassiveBranchData,
ctrl_branch_data_t: ActiveBranchData,
branch_vars: BranchVars,
bus_vars: BusVars,
prob: LpModel,
inf=1e20,
add_losses_approximation: bool = False):
"""
Formulate the branches
:param t: time index
:param Sbase: base power (100 MVA)
:param bus_data_t: BusData
:param branch_data_t: BranchData
:param ctrl_branch_data_t: ControllableBranchData
:param branch_vars: BranchVars
:param bus_vars: BusVars
:param prob: OR problem
:param inf: number considered infinite
:param add_losses_approximation: If true the distribution factors losses approximation is used
:return objective function
"""
f_obj = 0.0
# for each branch
for m in range(branch_data_t.nelm):
fr = branch_data_t.F[m]
to = branch_data_t.T[m]
# copy rates
branch_vars.rates[t, m] = branch_data_t.rates[m]
if branch_data_t.active[m]:
# declare the flow LPVar
branch_vars.flows[t, m] = prob.add_var(lb=-inf,
ub=inf,
name=join("flow_", [t, m], "_"))
if branch_data_t.dc[m]:
# DC Branch
if branch_data_t.R[m] == 0.0:
bk = 1e-20
else:
bk = 1.0 / branch_data_t.R[m]
branch_vars.flows[t, m] = bk * (bus_vars.Vm[t, fr] - bus_vars.Vm[t, to])
else:
# AC branch
# compute the branch susceptance
if branch_data_t.X[m] == 0.0:
bk = 1e-20
else:
bk = 1.0 / branch_data_t.X[m]
# compute the flow
if ctrl_branch_data_t.tap_phase_control_mode[m] == TapPhaseControl.Pf.idx():
# add angle
branch_vars.tap_angles[t, m] = prob.add_var(lb=ctrl_branch_data_t.tap_angle_min[m],
ub=ctrl_branch_data_t.tap_angle_max[m],
name=join("tap_ang_", [t, m], "_"))
# is a phase shifter device (like phase shifter transformer or VSC with P control)
# flow_ctr = branch_vars.flows[t, m] == bk * (
# bus_vars.theta[t, fr] - bus_vars.theta[t, to] + branch_vars.tap_angles[t, m])
# prob.add_cst(cst=flow_ctr, name=join("Branch_flow_set_with_ps_", [t, m], "_"))
branch_vars.flows[t, m] = bk * (bus_vars.Va[t, fr] -
bus_vars.Va[t, to] +
branch_vars.tap_angles[t, m])
else: # rest of the branches
# is a phase shifter device (like phase shifter transformer or VSC with P control)
# flow_ctr = branch_vars.flows[t, m] == bk * (bus_vars.theta[t, fr] - bus_vars.theta[t, to])
# prob.add_cst(cst=flow_ctr, name=join("Branch_flow_set_", [t, m], "_"))
branch_vars.flows[t, m] = bk * (bus_vars.Va[t, fr] - bus_vars.Va[t, to])
# power injected and subtracted due to the phase shift
bus_vars.Pbalance[t, fr] += - branch_vars.flows[t, m]
bus_vars.Pbalance[t, to] += branch_vars.flows[t, m]
if add_losses_approximation:
# declare the abs flow LPVar
branch_vars.z_flows[t, m] = prob.add_var(lb=0,
ub=inf,
name=join("z_flow_", [t, m], "_"))
# zij β₯ Fij
prob.add_cst(cst=branch_vars.flows[t, m] <= branch_vars.z_flows[t, m],
name=join("z_flow_u_lim_", [t, m]))
# zij β₯ -Fij
prob.add_cst(cst=-branch_vars.flows[t, m] <= branch_vars.z_flows[t, m],
name=join("z_flow_l_lim_", [t, m]))
# add to the objective (Ξ±ij =rij * β£Fij0β£/V2)
# insetad of the rates, the literature suggests an absolute initial value of the flow
V = bus_data_t.Vnom[branch_data_t.F[m]]
if V > 0.0:
factor = branch_data_t.R[m] * branch_data_t.rates[m] / (V ** 2)
else:
factor = branch_data_t.R[m]
# store the values for later retrieval
branch_vars.losses[t, m] = branch_vars.z_flows[t, m] * factor
# add the losses to the objective
f_obj += branch_vars.losses[t, m]
# divide the losses contribution equally among the from and to buses as new loads
l_inj = 0.5 * branch_vars.losses[t, m]
bus_vars.Pbalance[t, fr] += - l_inj
bus_vars.Pbalance[t, to] += - l_inj
# add the flow constraint if monitored
if branch_data_t.monitor_loading[m]:
branch_vars.flow_slacks_pos[t, m] = prob.add_var(0, inf,
name=join("flow_slack_pos_", [t, m], "_"))
branch_vars.flow_slacks_neg[t, m] = prob.add_var(0, inf,
name=join("flow_slack_neg_", [t, m], "_"))
# add upper rate constraint
branch_vars.flow_constraints_ub[t, m] = (branch_vars.flows[t, m] +
branch_vars.flow_slacks_pos[t, m] -
branch_vars.flow_slacks_neg[t, m]
<= branch_data_t.rates[m] / Sbase)
prob.add_cst(cst=branch_vars.flow_constraints_ub[t, m],
name=join("br_flow_upper_lim_", [t, m]))
# add lower rate constraint
branch_vars.flow_constraints_lb[t, m] = (branch_vars.flows[t, m] +
branch_vars.flow_slacks_pos[t, m] -
branch_vars.flow_slacks_neg[t, m]
>= -branch_data_t.rates[m] / Sbase)
prob.add_cst(cst=branch_vars.flow_constraints_lb[t, m],
name=join("br_flow_lower_lim_", [t, m]))
branch_vars.overload_cost[t, m] = (branch_data_t.overload_cost[m] * branch_vars.flow_slacks_pos[t, m]
+ branch_data_t.overload_cost[m] * branch_vars.flow_slacks_neg[t, m])
# add to the objective function
f_obj += branch_vars.overload_cost[t, m]
return f_obj
[docs]
def add_linear_branches_contingencies_formulation(t_idx: int,
Sbase: float,
branch_data_t: PassiveBranchData,
hvdc_vars: HvdcVars,
vsc_vars: VscVars,
branch_vars: BranchVars,
bus_vars: BusVars,
prob: LpModel,
linear_multi_contingencies: LinearMultiContingencies):
"""
Formulate the branches
:param t_idx: time index
:param Sbase: base power (100 MVA)
:param branch_data_t: BranchData
:param hvdc_vars: HvdcVars
:param vsc_vars: VscVars
:param branch_vars: BranchVars
:param bus_vars: BusVars
:param prob: OR problem
:param linear_multi_contingencies: LinearMultiContingencies
:return objective function
"""
f_obj = 0.0
for c, contingency in enumerate(linear_multi_contingencies.multi_contingencies):
# compute the contingency flow (Lp expression)
contingency_flows, mask, changed_idx = contingency.get_lp_contingency_flows(
base_flow=branch_vars.flows[t_idx, :],
injections=bus_vars.Pinj[t_idx, :],
hvdc_flow=hvdc_vars.flows[t_idx, :],
vsc_flow=vsc_vars.flows[t_idx, :]
)
for m, contingency_flow in enumerate(contingency_flows):
if mask[m]:
if isinstance(contingency_flow, LpExp): # if the contingency is not 0
# declare slack variables
pos_slack = prob.add_var(0, 1e20, join("br_cst_flow_pos_sl_", [t_idx, m, c]))
neg_slack = prob.add_var(0, 1e20, join("br_cst_flow_neg_sl_", [t_idx, m, c]))
# register the contingency data to evaluate the result at the end
branch_vars.add_contingency_flow(t=t_idx, m=m, c=c,
flow_var=contingency_flow,
neg_slack=neg_slack,
pos_slack=pos_slack)
# add upper rate constraint
prob.add_cst(
cst=contingency_flow + pos_slack - neg_slack <= branch_data_t.rates[m] / Sbase,
name=join("br_cst_flow_upper_lim_", [t_idx, m, c])
)
# add lower rate constraint
prob.add_cst(
cst=contingency_flow + pos_slack - neg_slack >= -branch_data_t.rates[m] / Sbase,
name=join("br_cst_flow_lower_lim_", [t_idx, m, c])
)
f_obj += pos_slack + neg_slack
return f_obj
[docs]
def pmode3_formulation_impr(prob: LpModel,
elm_idx: int,
t_idx: int,
m: float,
rate: float,
P0: float,
droop: float,
theta_f: LpVar,
theta_t: LpVar,
base_name: str,
logger: Logger):
"""
Formulation for HVDC link with three operating regions using big-M and binary variables.
:param prob:
:param elm_idx:
:param t_idx:
:param m:
:param rate:
:param P0:
:param droop:
:param theta_f:
:param theta_t:
:param base_name:
:param logger:
:return:
"""
# Ensure droop is not zero or too small to avoid division issues
if abs(droop) < 1e-5:
logger.add_warning("droop below threshold set to 1",
device=f"HVDC line {elm_idx}",
value=droop,
expected_value="abs(droop) < 1e-5")
droop = 1.0
# Variables
flow = prob.add_var(
lb=-rate,
ub=rate,
name=f"{base_name}_flow_{t_idx}_{m}"
)
z1 = prob.add_int(lb=0, ub=1, name=f"{base_name}_z1_{t_idx}_{m}")
z2 = prob.add_int(lb=0, ub=1, name=f"{base_name}_z2_{t_idx}_{m}")
z3 = prob.add_int(lb=0, ub=1, name=f"{base_name}_z3_{t_idx}_{m}")
# Constants
delta = theta_f - theta_t
delta_low = (-rate - P0) / droop
delta_high = (rate - P0) / droop
M = 20 * rate # Big-M as per note; may need adjustment for angle constraints
# Exactly one region active
prob.add_cst(z1 + z2 + z3 >= 1, name=f"one_region_ge_{t_idx}_{m}")
prob.add_cst(z1 + z2 + z3 <= 1, name=f"one_region_le_{t_idx}_{m}")
# Region constraints
# Region 1 (z1=1): saturated at -rate, delta <= delta_low
prob.add_cst(delta <= delta_low + M * (1 - z1), name=f"region1_le_{t_idx}_{m}")
# Region 2 (z2=1): droop, delta_low <= delta <= delta_high
prob.add_cst(delta >= delta_low - M * (1 - z2), name=f"region2_ge_{t_idx}_{m}")
prob.add_cst(delta <= delta_high + M * (1 - z2), name=f"region2_le_{t_idx}_{m}")
# Region 3 (z3=1): saturated at +rate, delta >= delta_high
prob.add_cst(delta >= delta_high - M * (1 - z3), name=f"region3_ge_{t_idx}_{m}")
# Power constraints
# Region 1: flow = -rate when z1=1
prob.add_cst(flow >= -rate - M * (1 - z1), name=f"power1_ge_{t_idx}_{m}")
prob.add_cst(flow <= -rate + M * (1 - z1), name=f"power1_le_{t_idx}_{m}")
# Region 2: flow = P0 + droop * (theta_f - theta_t) when z2=1
prob.add_cst(flow - (P0 + droop * delta) >= -M * (1 - z2), name=f"power2_ge_{t_idx}_{m}")
prob.add_cst(flow - (P0 + droop * delta) <= M * (1 - z2), name=f"power2_le_{t_idx}_{m}")
# Region 3: flow = rate when z3=1
prob.add_cst(flow >= rate - M * (1 - z3), name=f"power3_ge_{t_idx}_{m}")
prob.add_cst(flow <= rate + M * (1 - z3), name=f"power3_le_{t_idx}_{m}")
return flow
[docs]
def add_linear_hvdc_formulation(t: int,
Sbase: float,
hvdc_data_t: HvdcData,
hvdc_vars: HvdcVars,
bus_vars: BusVars,
prob: LpModel,
logger: Logger):
"""
:param t:
:param Sbase:
:param hvdc_data_t:
:param hvdc_vars:
:param bus_vars:
:param prob:
:param logger:
:return:
"""
f_obj = 0.0
for m in range(hvdc_data_t.nelm):
fr = hvdc_data_t.F[m]
to = hvdc_data_t.T[m]
hvdc_vars.rates[t, m] = hvdc_data_t.rates[m]
if hvdc_data_t.active[m]:
if hvdc_data_t.control_mode_int[m] == HvdcControlType.type_0_free.idx():
# use improved pmode3 formulation with three operating regions
P0 = hvdc_data_t.Pset[m] / Sbase
# convert MW/deg to pu/rad
droop = hvdc_data_t.get_angle_droop_in_pu_rad_at(m, Sbase)
if droop == 0.0:
# avoid division by zero in pmode3_formulation_impr
droop = 1e-20
rate = hvdc_data_t.rates[m] / Sbase
hvdc_vars.flows[t, m] = pmode3_formulation_impr(
prob=prob,
elm_idx=m,
t_idx=t,
m=m,
rate=rate,
P0=P0,
droop=droop,
theta_f=bus_vars.Va[t, fr],
theta_t=bus_vars.Va[t, to],
base_name="hvdc",
logger=logger
)
# add the injections matching the flow
bus_vars.Pbalance[t, fr] += - hvdc_vars.flows[t, m]
bus_vars.Pbalance[t, to] += hvdc_vars.flows[t, m]
elif hvdc_data_t.control_mode_int[m] == HvdcControlType.type_1_Pset.idx():
if hvdc_data_t.dispatchable[m]:
# declare the flow var
hvdc_vars.flows[t, m] = prob.add_var(lb=-hvdc_data_t.rates[m] / Sbase,
ub=hvdc_data_t.rates[m] / Sbase,
name=join("hvdc_flow_", [t, m], "_"))
# add the injections matching the flow
bus_vars.Pbalance[t, fr] += - hvdc_vars.flows[t, m]
bus_vars.Pbalance[t, to] += hvdc_vars.flows[t, m]
else:
if hvdc_data_t.Pset[m] > hvdc_data_t.rates[m]:
P0 = hvdc_data_t.rates[m] / Sbase
elif hvdc_data_t.Pset[m] < -hvdc_data_t.rates[m]:
P0 = -hvdc_data_t.rates[m] / Sbase
else:
P0 = hvdc_data_t.Pset[m] / Sbase
# declare the flow var
hvdc_vars.flows[t, m] = prob.add_var(lb=P0, ub=P0,
name=join("hvdc_flow_", [t, m], "_"))
# add the injections matching the flow
bus_vars.Pbalance[t, fr] += - hvdc_vars.flows[t, m]
bus_vars.Pbalance[t, to] += hvdc_vars.flows[t, m]
else:
raise Exception('OPF: Unknown HVDC control mode {}'.format(hvdc_data_t.control_mode_int[m]))
else:
# not active, therefore the flow is exactly zero
hvdc_vars.flows[t, m] = 0.0
return f_obj
[docs]
def add_linear_vsc_formulation(t: int,
Sbase: float,
vsc_data_t: VscData,
vsc_vars: VscVars,
bus_vars: BusVars,
prob: LpModel,
logger: Logger):
"""
:param t:
:param Sbase:
:param vsc_data_t:
:param vsc_vars:
:param bus_vars:
:param prob:
:param logger:
:return:
"""
f_obj = 0.0
any_dc_slack = False
for m in range(vsc_data_t.nelm):
fr = vsc_data_t.F[m]
to = vsc_data_t.T[m]
vsc_vars.rates[t, m] = vsc_data_t.rates[m]
if vsc_data_t.active[m]:
# declare the flow var
vsc_vars.flows[t, m] = prob.add_var(lb=-vsc_data_t.rates[m] / Sbase,
ub=vsc_data_t.rates[m] / Sbase,
name=join("vsc_flow_", [t, m], "_"))
# add the injections matching the flow
bus_vars.Pbalance[t, fr] += - vsc_vars.flows[t, m]
bus_vars.Pbalance[t, to] += vsc_vars.flows[t, m]
if (vsc_data_t.control1_int[m] == ConverterControlType.Vm_dc.idx() or
vsc_data_t.control2_int[m] == ConverterControlType.Vm_dc.idx()):
# set the DC slack
bus_vars.Vm[t, fr] = 1.0
any_dc_slack = True
else:
# not active, therefore the flow is exactly zero
vsc_vars.flows[t, m] = 0.0
if not any_dc_slack and vsc_data_t.nelm > 0:
logger.add_warning("No DC Slack! set Vm_dc in any of the converters")
return f_obj
[docs]
def add_linear_node_balance(t_idx: int,
vd: IntVec,
bus_data: BusData,
bus_vars: BusVars,
nodal_capacity_vars: NodalCapacityVars,
capacity_nodes_idx: IntVec,
prob: LpModel,
logger: Logger):
"""
Add the Kirchhoff nodal equality
:param t_idx: time step
:param vd: List of slack node indices
:param bus_data: BusData
:param bus_vars: BusVars
:param nodal_capacity_vars: NodalCapacityVars
:param capacity_nodes_idx: IntVec
:param prob: LpModel
:param logger: Logger
"""
# Note: At this point, Pbalance has all the devices' power summed up inside (including branches)
if len(capacity_nodes_idx) > 0:
bus_vars.Pbalance[t_idx, capacity_nodes_idx] += nodal_capacity_vars.P[t_idx, :]
# add the equality restrictions
for k in range(bus_data.nbus):
if isinstance(bus_vars.Pbalance[t_idx, k], (int, float)):
bus_vars.kirchhoff[t_idx, k] = prob.add_cst(
cst=bus_vars.Va[t_idx, k] == 0,
name=join("island_bus_", [t_idx, k], "_")
)
logger.add_warning("bus isolated",
device=bus_data.names[k] + f'@t={t_idx}')
else:
bus_vars.kirchhoff[t_idx, k] = prob.add_cst(
cst=bus_vars.Pbalance[t_idx, k] == 0,
name=join("kirchoff_", [t_idx, k], "_")
)
Va = np.angle(bus_data.Vbus)
for i in vd:
prob.set_var_bounds(var=bus_vars.Va[t_idx, i], lb=Va[i], ub=Va[i])
[docs]
def add_copper_plate_balance(t_idx: int,
bus_vars: BusVars,
prob: LpModel, ):
"""
Add the copperplate equality
:param t_idx: time step
:param bus_vars: BusVars
:param prob: LpModel
"""
bus_vars.kirchhoff[t_idx, 0] = prob.add_cst(
cst=sum(bus_vars.Pbalance[t_idx, :]) == 0,
name=join("copper_plate_", [t_idx, 0], "_")
)
[docs]
def add_hydro_formulation(t: Union[int, None],
time_global_tidx: Union[int, None],
time_array: DateVec,
Sbase: float,
node_vars: FluidNodeVars,
path_vars: FluidPathVars,
inj_vars: FluidInjectionVars,
node_data: FluidNodeData,
path_data: FluidPathData,
turbine_data: FluidTurbineData,
pump_data: FluidPumpData,
p2x_data: FluidP2XData,
generator_data: GeneratorData,
generator_vars: GenerationVars,
fluid_level_0: Vec,
prob: LpModel,
logger: Logger):
"""
Formulate the branches
:param t: local time index
:param time_global_tidx: global time index
:param time_array: list of time indices
:param Sbase: base power of the system
:param node_vars: FluidNodeVars
:param path_vars: FluidPathVars
:param inj_vars: FluidInjectionVars
:param node_data: FluidNodeData
:param path_data: FluidPathData
:param turbine_data: FluidTurbineData
:param pump_data: FluidPumpData
:param p2x_data: FluidP2XData
:param generator_data: GeneratorData
:param generator_vars: GeneratorVars
:param fluid_level_0: Initial node level
:param prob: OR problem
:param logger: log of the LP
:return objective function
"""
f_obj = 0.0
for m in range(node_data.nelm):
node_vars.spillage[t, m] = prob.add_var(lb=0.00,
ub=1e20,
name=join("NodeSpillage_", [t, m], "_"))
f_obj += node_data.spillage_cost[m] * node_vars.spillage[t, m]
# f_obj += node_vars.spillage[t, m]
min_abs_level = node_data.max_level[m] * node_data.min_soc[m]
node_vars.current_level[t, m] = prob.add_var(lb=min_abs_level,
ub=node_data.max_level[m] * node_data.max_soc[m],
name=join("level_", [t, m], "_"))
if min_abs_level < node_data.min_level[m]:
logger.add_error(msg='Node SOC is below the allowed minimum level',
value=min_abs_level,
expected_value=node_data.min_level[m],
device_class="FluidNode",
device_property=f"Min SOC at {t}")
for m in range(path_data.nelm):
path_vars.flow[t, m] = prob.add_var(lb=path_data.min_flow[m],
ub=path_data.max_flow[m],
name=join("hflow_", [t, m], "_"))
# Constraints
for m in range(path_data.nelm):
# inflow: fluid flow entering the target node in m3/s
# outflow: fluid flow leaving the source node in m3/s
# flow: amount of fluid flowing through the river in m3/s
node_vars.flow_in[t, path_data.target_idx[m]] += path_vars.flow[t, m]
node_vars.flow_out[t, path_data.source_idx[m]] += path_vars.flow[t, m]
for m in range(turbine_data.nelm):
gen_idx = turbine_data.generator_idx[m]
plant_idx = turbine_data.plant_idx[m]
# flow [m3/s] = pgen [pu] * max_flow [m3/s] / (Pgen_max [MW] / Sbase [MW] * eff)
coeff = turbine_data.max_flow_rate[m] / (generator_data.pmax[gen_idx] / Sbase * turbine_data.efficiency[m])
turbine_flow = (generator_vars.p[t, gen_idx] * coeff)
# node_vars.flow_out[t, plant_idx] = turbine_flow # assume only 1 turbine connected
# if t > 0:
inj_vars.flow[t, m] = turbine_flow # to retrieve the value later on
prob.add_cst(cst=(node_vars.flow_out[t, plant_idx] == turbine_flow),
name=join("turbine_river_", [t, m], "_"))
if generator_data.pmin[gen_idx] < 0:
logger.add_error(msg='Turbine generator pmin < 0 is not possible',
value=generator_data.pmin[gen_idx])
# f_obj += turbine_flow
for m in range(pump_data.nelm):
gen_idx = pump_data.generator_idx[m]
plant_idx = pump_data.plant_idx[m]
# flow [m3/s] = pcons [pu] * max_flow [m3/s] * eff / (Pcons_min [MW] / Sbase [MW])
# invert the efficiency compared to a turbine
# pmin instead of pmax because the sign should be inverted (consuming instead of generating)
coeff = pump_data.max_flow_rate[m] * pump_data.efficiency[m] / (abs(generator_data.pmin[gen_idx]) / Sbase)
pump_flow = (generator_vars.p[t, gen_idx] * coeff)
# node_vars.flow_in[t, plant_idx] = pump_flow # assume only 1 pump connected
# if t > 0:
inj_vars.flow[t, m + turbine_data.nelm] = - pump_flow
prob.add_cst(cst=(node_vars.flow_in[t, plant_idx] == - pump_flow),
name=join("pump_river_", [t, m], "_"))
if generator_data.pmax[gen_idx] > 0:
logger.add_error(msg='Pump generator pmax > 0 is not possible',
value=generator_data.pmax[gen_idx])
# f_obj += - pump_flow
for m in range(p2x_data.nelm):
gen_idx = p2x_data.generator_idx[m]
# flow[m3/s] = pcons [pu] * max_flow [m3/s] * eff / (Pcons_max [MW] / Sbase [MW])
# invert the efficiency compared to a turbine
# pmin instead of pmax because the sign should be inverted (consuming instead of generating)
coeff = p2x_data.max_flow_rate[m] * p2x_data.efficiency[m] / (abs(generator_data.pmin[gen_idx]) / Sbase)
p2x_flow = (generator_vars.p[t, gen_idx] * coeff)
# if t > 0:
node_vars.p2x_flow[t, p2x_data.plant_idx[m]] += - p2x_flow
inj_vars.flow[t, m + turbine_data.nelm + pump_data.nelm] = - p2x_flow
if generator_data.pmax[gen_idx] > 0:
logger.add_error(msg='P2X generator pmax > 0 is not possible',
value=generator_data.pmax[gen_idx])
# f_obj += - p2x_flow
if time_global_tidx is not None:
# constraints for the node level
for m in range(node_data.nelm):
if t == 0:
if len(time_array) > time_global_tidx + 1:
dt = (time_array[time_global_tidx + 1] - time_array[time_global_tidx]).seconds
else:
dt = 3600
# Initialize level at fluid_level_0
prob.add_cst(cst=(node_vars.current_level[t, m] ==
fluid_level_0[m]
+ dt * node_data.inflow[m]
+ dt * node_vars.flow_in[t, m]
+ dt * node_vars.p2x_flow[t, m]
- dt * node_vars.spillage[t, m]
- dt * node_vars.flow_out[t, m]),
name=join("nodal_balance_", [t, m], "_"))
else:
# Update the level according to the in and out flows as time passes
dt = (time_array[time_global_tidx] - time_array[time_global_tidx - 1]).seconds
prob.add_cst(cst=(node_vars.current_level[t, m] ==
node_vars.current_level[t - 1, m]
+ dt * node_data.inflow[m]
+ dt * node_vars.flow_in[t, m]
+ dt * node_vars.p2x_flow[t, m]
- dt * node_vars.spillage[t, m]
- dt * node_vars.flow_out[t, m]),
name=join("nodal_balance_", [t, m], "_"))
return f_obj
[docs]
def run_linear_opf_ts(grid: MultiCircuit,
time_indices: Union[IntVec, None],
dispatch_mode: OpfDispatchMode = OpfDispatchMode.Normal,
solver_type: MIPSolvers = MIPSolvers.HIGHS,
zonal_grouping: ZonalGrouping = ZonalGrouping.NoGrouping,
skip_generation_limits: bool = False,
consider_contingencies: bool = False,
contingency_groups_used: Union[List[ContingencyGroup], None] = None,
ramp_constraints: bool = False,
consider_time_up_down: bool = False,
area_spinning_reserve: bool = False,
lodf_threshold: float = 0.001,
inter_aggregation_info: InterAggregationInfo | None = None,
energy_0: Union[Vec, None] = None,
fluid_level_0: Union[Vec, None] = None,
nodal_capacity_sign: float = 1.0,
capacity_nodes_idx: Union[IntVec, None] = None,
use_glsk_as_cost: bool = False,
add_losses_approximation: bool = False,
logger: Logger = Logger(),
progress_text: Union[None, Callable[[str], None]] = None,
progress_func: Union[None, Callable[[float], None]] = None,
verbose: int = 0,
robust: bool = False,
mip_framework: MIPFramework = MIPFramework.PuLP) -> Tuple[OpfVars, LpModel]:
"""
Formulate linear optimal power flow
:param grid: MultiCircuit instance
:param time_indices: Time indices (in the general scheme)
:param dispatch_mode: OpfDispatchMode
:param solver_type: MIP solver to use
:param zonal_grouping: Zonal grouping?
:param skip_generation_limits: Skip the generation limits?
:param consider_contingencies: Consider the contingencies?
:param contingency_groups_used: List of contingency groups to use
:param ramp_constraints: Formulate ramp constraints?
:param consider_time_up_down: Consider the time up/down?
:param area_spinning_reserve: Area spinning reserve?
:param lodf_threshold: LODF threshold value to consider contingencies
:param inter_aggregation_info: Inter rea (or country, etc) information
:param energy_0: Vector of initial energy for batteries (size: Number of batteries)
:param fluid_level_0: initial fluid level of the nodes
:param nodal_capacity_sign: if > 0 the generation is maximized, if < 0 the load is maximized
:param capacity_nodes_idx: Array of bus indices to optimize their nodal capacity for
:param use_glsk_as_cost: If true the generators use the GLSK as dispatch values
:param add_losses_approximation: If true the distribution factors losses approximation is used
:param logger: logger instance
:param progress_text: Text progress callback
:param progress_func: Numerical progress callback
:param verbose: verbosity level
:param robust: Robust optimization?
:param mip_framework: MIP framework to use
:return: OpfVars
"""
bus_dict = {bus: i for i, bus in enumerate(grid.buses)}
areas_dict = {elm: i for i, elm in enumerate(grid.areas)}
if time_indices is None:
time_indices = [None]
else:
if len(time_indices) > 0:
# time indices are ok
pass
else:
time_indices = [None]
active_nodal_capacity = True
if capacity_nodes_idx is None:
active_nodal_capacity = False
capacity_nodes_idx = np.zeros(0, dtype=int)
if contingency_groups_used is None:
contingency_groups_used = grid.get_contingency_groups()
nt = len(time_indices) if len(time_indices) > 0 else 1
n = grid.get_bus_number()
nbr = grid.get_branch_number(add_vsc=False, add_hvdc=False, add_switch=True)
ng = grid.get_generators_number()
nb = grid.get_batteries_number()
nl = grid.get_load_like_device_number()
n_hvdc = grid.get_hvdc_number()
n_vsc = grid.get_vsc_number()
n_fluid_node = grid.get_fluid_nodes_number()
n_fluid_path = grid.get_fluid_paths_number()
n_fluid_inj = grid.get_fluid_injection_number()
# gather the fuels and emission rates matrices
gen_emissions_rates_matrix = grid.get_gen_emission_rates_sparse_matrix()
gen_fuel_rates_matrix = grid.get_gen_fuel_rates_sparse_matrix()
gen_tech_shares_matrix = grid.get_gen_technology_connectivity_matrix()
batt_tech_shares_matrix = grid.get_batt_technology_connectivity_matrix()
if dispatch_mode == OpfDispatchMode.InterAreaRedispatch:
inter_area_branches = inter_aggregation_info.lst_br
inter_area_hvdc = inter_aggregation_info.lst_br_hvdc
else:
inter_area_branches = list()
inter_area_hvdc = list()
# declare structures of LP vars
mip_vars = OpfVars(nt=nt, nbus=n, ng=ng, nb=nb, nl=nl,
nbr=nbr, n_hvdc=n_hvdc, n_vsc=n_vsc,
n_fluid_node=n_fluid_node,
n_fluid_path=n_fluid_path,
n_fluid_inj=n_fluid_inj,
n_cap_buses=len(capacity_nodes_idx))
# create the MIP problem object
lp_model: LpModel = get_model_instance(tpe=mip_framework, solver_type=solver_type)
logger.add_info(f"MIP Framework", value=lp_model.name)
logger.add_info(f"MIP Solver", value=solver_type.value)
# objective function
f_obj: Union[LpExp, float] = 0.0
for local_t_idx, global_t_idx in enumerate(time_indices): # use time_indices = [None] to simulate the snapshot
# time indices:
# imagine that the complete VeraGrid DB time goes from 0 to 1000
# but, for whatever reason, time_indices is [100..200]
# local_t_idx would go from 0..100
# global_t_idx would go from 100..200
# compile the circuit at the master time index ------------------------------------------------------------
# note: There are very little chances of simplifying this step and experience shows
# it is not worth the effort, so compile every time step
nc: NumericalCircuit = compile_numerical_circuit_at(
circuit=grid,
t_idx=global_t_idx, # yes, this is not a bug
bus_dict=bus_dict,
areas_dict=areas_dict,
fill_gep=dispatch_mode == OpfDispatchMode.GenerationExpansionPlanning,
logger=logger
)
indices = nc.get_simulation_indices()
# formulate the bus angles ---------------------------------------------------------------------------------
for k in range(nc.bus_data.nbus):
# we declare Vm for DC buses and Va for AC buses
if nc.bus_data.is_dc[k]:
mip_vars.bus_vars.Vm[local_t_idx, k] = lp_model.add_var(
lb=nc.bus_data.Vmin[k],
ub=nc.bus_data.Vmax[k],
name=f"Vm_{local_t_idx}_{k}"
)
else:
mip_vars.bus_vars.Va[local_t_idx, k] = lp_model.add_var(
lb=nc.bus_data.angle_min[k],
ub=nc.bus_data.angle_max[k],
name=f"Va_{local_t_idx}_{k}"
)
# formulate loads ------------------------------------------------------------------------------------------
f_obj += add_linear_load_formulation(
t=local_t_idx,
Sbase=nc.Sbase,
load_data_t=nc.load_data,
bus_vars=mip_vars.bus_vars,
load_vars=mip_vars.load_vars,
prob=lp_model
)
# formulate generation -------------------------------------------------------------------------------------
# formulate batteries energy
if local_t_idx == 0 and energy_0 is None:
# declare the initial energy of the batteries
energy_0 = nc.battery_data.soc_0 * nc.battery_data.enom # in MWh here
if dispatch_mode == OpfDispatchMode.Normal:
f_obj += add_linear_simple_generation_formulation(
local_t=local_t_idx,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
gen_data_t=nc.generator_data,
gen_vars=mip_vars.gen_vars,
prob=lp_model,
ramp_constraints=ramp_constraints,
consider_time_up_down=consider_time_up_down,
area_spinning_reserve=area_spinning_reserve,
skip_generation_limits=skip_generation_limits,
use_glsk_as_cost=use_glsk_as_cost,
logger=logger
)
f_obj += add_linear_battery_formulation(
t=local_t_idx,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
batt_data_t=nc.battery_data,
batt_vars=mip_vars.batt_vars,
prob=lp_model,
unit_commitment=False,
ramp_constraints=ramp_constraints,
skip_generation_limits=skip_generation_limits,
generation_expansion_planning=False,
energy_0=energy_0
)
elif dispatch_mode == OpfDispatchMode.UnitCommitment:
f_obj += add_linear_generation_unit_commitment_formulation(
local_t=local_t_idx,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
gen_data_t=nc.generator_data,
gen_vars=mip_vars.gen_vars,
prob=lp_model,
ramp_constraints=ramp_constraints,
consider_time_up_down=consider_time_up_down,
area_spinning_reserve=area_spinning_reserve,
skip_generation_limits=skip_generation_limits,
use_glsk_as_cost=use_glsk_as_cost,
logger=logger
)
f_obj += add_linear_battery_formulation(
t=local_t_idx,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
batt_data_t=nc.battery_data,
batt_vars=mip_vars.batt_vars,
prob=lp_model,
unit_commitment=True,
ramp_constraints=ramp_constraints,
skip_generation_limits=skip_generation_limits,
generation_expansion_planning=False,
energy_0=energy_0
)
elif dispatch_mode == OpfDispatchMode.NodalCapacity:
add_linear_nodal_capacity_generation_formulation(
local_t=local_t_idx,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
gen_data_t=nc.generator_data,
gen_vars=mip_vars.gen_vars,
prob=lp_model,
skip_generation_limits=skip_generation_limits,
vd=indices.vd,
nodal_capacity_active=active_nodal_capacity,
use_glsk_as_cost=use_glsk_as_cost,
logger=logger
)
f_obj += add_linear_battery_formulation(
t=local_t_idx,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
batt_data_t=nc.battery_data,
batt_vars=mip_vars.batt_vars,
prob=lp_model,
unit_commitment=False,
ramp_constraints=ramp_constraints,
skip_generation_limits=skip_generation_limits,
generation_expansion_planning=False,
energy_0=energy_0
)
elif dispatch_mode == OpfDispatchMode.GenerationExpansionPlanning:
add_linear_generation_expansion_planning_formulation(
local_t=local_t_idx,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
gen_data_t=nc.generator_data,
gen_vars=mip_vars.gen_vars,
prob=lp_model,
ramp_constraints=ramp_constraints,
skip_generation_limits=skip_generation_limits,
use_glsk_as_cost=use_glsk_as_cost,
logger=logger
)
f_obj += add_linear_battery_formulation(
t=local_t_idx,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
batt_data_t=nc.battery_data,
batt_vars=mip_vars.batt_vars,
prob=lp_model,
unit_commitment=False,
ramp_constraints=ramp_constraints,
skip_generation_limits=skip_generation_limits,
generation_expansion_planning=True,
energy_0=energy_0
)
elif dispatch_mode == OpfDispatchMode.InterAreaRedispatch:
f_obj += add_linear_generation_redispatch_formulation(
local_t=local_t_idx,
Sbase=nc.Sbase,
bus_vars=mip_vars.bus_vars,
gen_data_t=nc.generator_data,
gen_vars=mip_vars.gen_vars,
prob=lp_model,
inter_aggregation_info=inter_aggregation_info,
skip_generation_limits=skip_generation_limits,
use_glsk_as_cost=use_glsk_as_cost,
logger=logger
)
f_obj += add_linear_battery_formulation(
t=local_t_idx,
Sbase=nc.Sbase,
time_array=grid.time_profile,
bus_vars=mip_vars.bus_vars,
batt_data_t=nc.battery_data,
batt_vars=mip_vars.batt_vars,
prob=lp_model,
unit_commitment=False,
ramp_constraints=ramp_constraints,
skip_generation_limits=skip_generation_limits,
generation_expansion_planning=False,
energy_0=energy_0
)
else:
raise ValueError(f"dispatch mode not supported {dispatch_mode}")
# formulate nodal capacity -------------------------------------------------------------------------------------
if dispatch_mode == OpfDispatchMode.NodalCapacity:
f_obj += add_nodal_capacity_formulation(
t=local_t_idx,
nodal_capacity_vars=mip_vars.nodal_capacity_vars,
nodal_capacity_sign=nodal_capacity_sign,
capacity_nodes_idx=capacity_nodes_idx,
prob=lp_model
)
# add emissions ------------------------------------------------------------------------------------------------
if gen_emissions_rates_matrix.shape[0] > 0:
# amount of emissions per gas
emissions = lpDot(gen_emissions_rates_matrix, mip_vars.gen_vars.p[local_t_idx, :])
f_obj += lp_model.sum(emissions)
# add fuels ----------------------------------------------------------------------------------------------------
if gen_fuel_rates_matrix.shape[0] > 0:
# amount of fuels
fuels_amount = lpDot(gen_fuel_rates_matrix, mip_vars.gen_vars.p[local_t_idx, :])
f_obj += lp_model.sum(fuels_amount)
# --------------------------------------------------------------------------------------------------------------
# if no zonal grouping, all the grid is considered...
if zonal_grouping == ZonalGrouping.NoGrouping:
# formulate hvdc -------------------------------------------------------------------------------------------
f_obj += add_linear_hvdc_formulation(
t=local_t_idx,
Sbase=nc.Sbase,
hvdc_data_t=nc.hvdc_data,
hvdc_vars=mip_vars.hvdc_vars,
bus_vars=mip_vars.bus_vars,
prob=lp_model,
logger=logger
)
# formulate vsc --------------------------------------------------------------------------------------------
f_obj += add_linear_vsc_formulation(
t=local_t_idx,
Sbase=nc.Sbase,
vsc_data_t=nc.vsc_data,
vsc_vars=mip_vars.vsc_vars,
bus_vars=mip_vars.bus_vars,
prob=lp_model,
logger=logger
)
# formulate branches ---------------------------------------------------------------------------------------
f_obj += add_linear_branches_formulation(
t=local_t_idx,
Sbase=nc.Sbase,
bus_data_t=nc.bus_data,
branch_data_t=nc.passive_branch_data,
ctrl_branch_data_t=nc.active_branch_data,
branch_vars=mip_vars.branch_vars,
bus_vars=mip_vars.bus_vars,
prob=lp_model,
inf=1e20,
add_losses_approximation=add_losses_approximation
)
# formulate nodes ------------------------------------------------------------------------------------------
add_linear_node_balance(
t_idx=local_t_idx,
vd=indices.vd,
bus_data=nc.bus_data,
bus_vars=mip_vars.bus_vars,
nodal_capacity_vars=mip_vars.nodal_capacity_vars,
capacity_nodes_idx=capacity_nodes_idx,
prob=lp_model,
logger=logger
)
# add branch contingencies ---------------------------------------------------------------------------------
if consider_contingencies:
if len(contingency_groups_used) > 0:
# The contingencies' formulation uses the total nodal injection stored in bus_vars,
# hence, this step goes before the add_linear_node_balance function
# compute the PTDF and LODF
ls = LinearAnalysis(nc=nc,
distributed_slack=False,
correct_values=True)
# Compute the more generalistic contingency structures
mctg = LinearMultiContingencies(grid=grid,
contingency_groups_used=contingency_groups_used)
mctg.compute(lin=ls,
ptdf_threshold=lodf_threshold,
lodf_threshold=lodf_threshold)
# formulate the contingencies
f_obj += add_linear_branches_contingencies_formulation(
t_idx=local_t_idx,
Sbase=nc.Sbase,
branch_data_t=nc.passive_branch_data,
branch_vars=mip_vars.branch_vars,
hvdc_vars=mip_vars.hvdc_vars,
vsc_vars=mip_vars.vsc_vars,
bus_vars=mip_vars.bus_vars,
prob=lp_model,
linear_multi_contingencies=mctg
)
else:
logger.add_warning(msg="Contingencies enabled, but no contingency groups provided")
# add inter area branch flow maximization ------------------------------------------------------------------
if dispatch_mode == OpfDispatchMode.InterAreaRedispatch:
# maximize the power at the from buses
for i in inter_aggregation_info.idx_bus_from:
f_obj += - mip_vars.bus_vars.Pgen[local_t_idx, i]
# minimize the power at the to buses
for i in inter_aggregation_info.idx_bus_to:
f_obj += mip_vars.bus_vars.Pgen[local_t_idx, i]
for k, branch, sense in inter_area_branches:
# we want to maximize, hence the minus sign
f_obj += mip_vars.branch_vars.flows[local_t_idx, k] * (- sense)
for k, branch, sense in inter_area_hvdc:
# we want to maximize, hence the minus sign
f_obj += mip_vars.hvdc_vars.flows[local_t_idx, k] * (- sense)
# add hydro side -------------------------------------------------------------------------------------------
if local_t_idx == 0 and fluid_level_0 is None:
# declare the initial level of the fluid nodes
fluid_level_0 = nc.fluid_node_data.initial_level
if n_fluid_node > 0:
f_obj += add_hydro_formulation(t=local_t_idx,
time_global_tidx=global_t_idx,
time_array=grid.time_profile,
Sbase=nc.Sbase,
node_vars=mip_vars.fluid_node_vars,
path_vars=mip_vars.fluid_path_vars,
inj_vars=mip_vars.fluid_inject_vars,
node_data=nc.fluid_node_data,
path_data=nc.fluid_path_data,
turbine_data=nc.fluid_turbine_data,
pump_data=nc.fluid_pump_data,
p2x_data=nc.fluid_p2x_data,
generator_data=nc.generator_data,
generator_vars=mip_vars.gen_vars,
fluid_level_0=fluid_level_0,
prob=lp_model,
logger=logger)
elif zonal_grouping == ZonalGrouping.All:
# this is the copper plate approach
add_copper_plate_balance(
t_idx=local_t_idx,
bus_vars=mip_vars.bus_vars,
prob=lp_model,
)
# production equals demand -------------------------------------------------------------------------------------
# NOTE: The production == demand, happens with the kirchoff equation for the grid and with the
# copper plate balance for the copper plate scenario
if progress_func is not None:
progress_func((local_t_idx + 1) / nt * 100.0)
# set the objective function
lp_model.minimize(f_obj)
# solve
if progress_text is not None:
progress_text("Solving...")
if progress_func is not None:
progress_func(0)
status = lp_model.solve(robust=robust,
show_logs=verbose > 0,
progress_text=progress_text)
# gather the results
logger.add_info(msg="Status", value=lp_model.status2string(status))
if status == lp_model.OPTIMAL:
logger.add_info("Objective function", value=lp_model.fobj_value())
mip_vars.acceptable_solution = True
else:
logger.add_error("The problem does not have an optimal solution.")
mip_vars.acceptable_solution = False
lp_file_name = os.path.join(opf_file_path(), f"{grid.name} opf debug.lp")
lp_model.save_model(file_name=lp_file_name)
logger.add_info("Debug LP model saved", value=lp_file_name)
# convert the lp vars to their values
vars_v = mip_vars.get_values(Sbase=grid.Sbase,
model=lp_model,
gen_emissions_rates_matrix=gen_emissions_rates_matrix,
gen_fuel_rates_matrix=gen_fuel_rates_matrix,
gen_tech_shares_matrix=gen_tech_shares_matrix,
batt_tech_shares_matrix=batt_tech_shares_matrix)
# add the model logger to the main logger
logger += lp_model.logger
# lp_model.save_model('nodal_opf.lp')
return vars_v, lp_model