# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
import warnings
from typing import List, Union, Tuple, Iterable, Callable
import numpy as np
from uuid import uuid4
from scipy.sparse import csc_matrix
from VeraGridEngine.Utils.MIP.SimpleMip.lpobjects import LpExp, LpCst, LpVar
from VeraGridEngine.Utils.MIP.SimpleMip.highs import HIGHS_AVAILABLE, solve_with_highs
from VeraGridEngine.basic_structures import Vec, Logger
from VeraGridEngine.enumerations import MIPSolvers
[docs]
def get_available_mip_solvers() -> List[str]:
"""
Get a list of candidate solvers
:return: list of solver names
"""
res = list()
if HIGHS_AVAILABLE:
res.append('HIGHS')
return res
[docs]
def set_var_bounds(var: LpVar, lb: float, ub: float):
"""
Modify the bounds of a variable
:param var: LpVar instance to modify
:param lb: lower bound value
:param ub: upper bound value
"""
if isinstance(var, LpVar):
var.lower_bound = lb
var.upper_bound = ub
[docs]
class LpModel:
"""
SimpleMIP
"""
__slots__ = (
"solver_type",
"objective",
"constraints",
"variables",
"relaxed_slacks",
"originally_infeasible",
"_is_minimize",
"_is_mip",
"_col_value",
"_col_dual",
"_row_value",
"_row_dual",
"_objective_value",
"_is_optimal",
"logger",
)
OPTIMAL = 100
INFINITY = 1e20
def __init__(self, solver_type: MIPSolvers = MIPSolvers.HIGHS):
if solver_type not in [MIPSolvers.HIGHS]:
warnings.warn(f"Unsupported solver {solver_type.value}, falling back to highs.")
self.solver_type = MIPSolvers.HIGHS
else:
self.solver_type = solver_type
self.objective: LpExp = LpExp()
self.constraints: List[LpCst] = list()
self.variables: List[LpVar] = list()
self.relaxed_slacks: List[Tuple[int, LpVar, float]] = list()
self.originally_infeasible = False
self._is_minimize = True
self._is_mip = False
# solution vars
self._col_value = np.empty(0)
self._col_dual = np.empty(0)
self._row_value = np.empty(0)
self._row_dual = np.empty(0)
self._objective_value = 0.0
self._is_optimal = False
self.logger = Logger()
[docs]
def is_minimize(self) -> bool:
"""
Minimize?
:return: bool
"""
return self._is_minimize
[docs]
def copy(self, copy_results: bool = False) -> "LpModel":
"""
Deep copy of this
:param copy_results: copy the results too?
:return: LpModel
"""
cpy = LpModel(self.solver_type)
cpy._is_minimize = self._is_minimize
cpy._is_mip = self._is_mip
cpy.objective = self.objective.copy()
for var in self.variables:
cpy.variables.append(var.copy())
for cst in self.constraints:
cpy.constraints.append(cst.copy())
if copy_results:
cpy._col_value = self._col_value
cpy._col_dual = self._col_dual
cpy._row_value = self._row_value
cpy._row_dual = self._row_dual
cpy._objective_value = self._objective_value
cpy._is_optimal = self._is_optimal
return cpy
[docs]
def get_obj_coefficient(self, var: LpVar) -> float:
"""
Get the coefficient of a variable, if not found, return 0.0
"""
return self.objective.terms.get(var, 0.0)
def _add_variable(self, lb: float = 0.0, ub: float = 1e20, name: str = "", is_int: bool = False):
"""
Add a variable to the problem
:param lb:
:param ub:
:param name:
:param is_int:
:return: Variable instance
"""
var = LpVar(name=name, lower_bound=lb, upper_bound=ub, is_integer=is_int, hash_id=uuid4().int)
self.variables.append(var)
return var
[docs]
def add_int(self, lb: int, ub: int, name: str = "") -> LpVar:
"""
Make integer LP var
:param lb: lower bound
:param ub: upper bound
:param name: name (optional)
:return: LpVar
"""
return self._add_variable(lb=lb, ub=ub, name=name, is_int=True)
[docs]
def add_var(self, lb: float = 0.0, ub: float = 1e20, name: str = "") -> LpVar:
"""
Make floating point LP var
:param lb: lower bound
:param ub: upper bound
:param name: name (optional)
:return: LpVar
"""
return self._add_variable(lb=lb, ub=ub, name=name, is_int=False)
[docs]
def add_vars(self, size: int, lb: float = 0.0, ub: float = 1e20, name: str = "", is_int=False) -> List[LpVar]:
"""
Make array of LP vars
:param size: number of variables
:param lb: lower bound
:param ub: upper bound
:param name: name (optional)
:param is_int: create integer variables
:return: LpVar
"""
return [self._add_variable(lb=lb, ub=ub, name=f"{name}_{i}", is_int=is_int)
for i in range(size)]
def _set_objective(self, expression: LpExp, is_minimize=True):
"""
Set the objective function
:param expression: Expression
:param is_minimize: minimize?
"""
self.objective = expression
self._is_minimize = is_minimize
[docs]
def minimize(self, obj_function: LpExp):
"""
Set the objective to minimize
:param obj_function: Expression
"""
self._set_objective(obj_function, is_minimize=True)
[docs]
def maximize(self, obj_function: LpExp):
"""
Set the objective to maximize
:param obj_function: Expression
"""
self._set_objective(obj_function, is_minimize=False)
[docs]
def add_cst(self, cst: Union[LpCst, bool], name: str = "") -> Union[LpCst, int]:
"""
Add constraint to the model
:param cst: constraint object (or general expression)
:param name: name of the constraint (optional)
:return: Constraint object
"""
if isinstance(cst, LpCst):
cst.name = name
self.constraints.append(cst)
return cst
else:
raise ValueError("Only Constraint instances can be added.")
[docs]
@staticmethod
def sum(expr: Union[LpExp, Iterable]) -> LpExp:
"""
create sum of the expression
:param expr: LpExp object (or general expression)
:return: LpExp object
"""
if isinstance(expr, LpExp):
return expr
elif isinstance(expr, Iterable):
res = LpExp()
for elm in expr:
res += elm
return res
else:
raise ValueError("Only iterables can be used with sum.")
[docs]
def save_model_to_lp(self, filename: str):
"""
Save model to LP file
:param filename: LP file path
"""
with open(filename, 'w') as file:
# Write Objective Function
obj_type = "Minimize" if self._is_minimize else "Maximize"
objective_expression = " + ".join([f"{coeff}*{var.name}"
for var, coeff
in self.objective.terms.items()])
if self.objective.offset != 0.0: # Add constant term if exists
objective_expression += f" + {self.objective.offset}"
file.write(f"{obj_type}\n obj: {objective_expression}\n")
# Write Constraints
file.write("\nSubject To\n")
for i, constraint in enumerate(self.constraints):
expression = constraint.linear_expression
constraint_expression = " + ".join([f"{coeff}*{var.name}"
for var, coeff
in expression.terms.items()])
if expression.offset != 0.0: # Add constant term if exists
objective_expression += f" + {expression.offset}"
cname = constraint.name.replace(" ", "_")
file.write(f"{cname}: {constraint_expression} {constraint.sense} {constraint.coefficient}\n")
# Write Bounds
file.write("\nBounds\n")
for var in self.variables:
lb = var.lower_bound if var.lower_bound is not None else "-inf"
ub = var.upper_bound if var.upper_bound is not None else "inf"
file.write(f" {lb} <= {var.name} <= {ub}\n")
# Write Variable Types
file.write("\nGenerals\n")
for var in self.variables:
if var.is_integer:
file.write(f" {var.name}\n")
file.write("End\n")
[docs]
def save_model_to_mps(self, filename):
"""
Save the model to MPS
:param filename: MPS file path
"""
with open(filename, 'w') as file:
# Write the header
file.write("NAME MIPMODEL\n")
# Write the ROWS section
file.write("ROWS\n")
file.write(" N COST\n")
for i in range(len(self.constraints)):
sense = self.constraints[i].sense
if sense == '<=':
file.write(f" L C{i}\n")
elif sense == '>=':
file.write(f" G C{i}\n")
elif sense == '==':
file.write(f" E C{i}\n")
# Write the COLUMNS section
file.write("COLUMNS\n")
for var in self.variables:
file.write(f" {var.name} COST {self.objective.terms.get(var, 0)}\n")
for i, constraint in enumerate(self.constraints):
coeff = constraint.linear_expression.terms.get(var, 0)
if coeff != 0:
file.write(f" {var.name} C{i} {coeff}\n")
# Write the RHS section
file.write("RHS\n")
for i, constraint in enumerate(self.constraints):
file.write(f" RHS1 C{i} {constraint.coefficient}\n")
# Write the BOUNDS section
file.write("BOUNDS\n")
for var in self.variables:
lb = var.lower_bound
ub = var.upper_bound
if var.is_integer:
file.write(f" UI BND1 {var.name} {ub}\n")
file.write(f" LI BND1 {var.name} {lb}\n")
else:
file.write(f" UP BND1 {var.name} {ub}\n")
file.write(f" LO BND1 {var.name} {lb}\n")
# Optionally add the RANGES section
# ...
# Write the ENDATA section
file.write("ENDATA\n")
[docs]
def save_model(self, file_name="ntc_opf_problem.lp"):
"""
Save model in lp or mps format
:param file_name: name of the file
"""
if file_name.endswith(".lp"):
self.save_model_to_lp(filename=file_name)
elif file_name.endswith(".mps"):
self.save_model_to_lp(filename=file_name)
else:
raise Exception(f"Unrecognized file format to save {file_name}")
def __str__(self):
"""
Formulate MIP
:return:
"""
res = ""
res += "\nVars:\n"
for var in self.variables:
var_type = "Integer" if var.is_integer else "Continuous"
res += f"{var.name}: {var_type}\n"
res += "\nObjective:\n"
obj_type = "Minimize" if self._is_minimize else "Maximize"
objective_expression = " + ".join([f"{coeff} * {var.name}"
for var, coeff
in self.objective.terms.items()])
if self.objective.offset != 0.0: # Add constant term if exists
objective_expression += f" + {self.objective.offset}"
res += f"{obj_type}: {objective_expression}\n"
res += "\nConstraints:\n"
for i, constraint in enumerate(self.constraints):
# Create the constraint expression
expression = constraint.linear_expression
constraint_expression = " + ".join([f"{coeff}*{var.name}"
for var, coeff
in expression.terms.items()])
if expression.offset != 0.0: # Add constant term if exists
constraint_expression += f" + {expression.offset}"
res += f"Constraint {i}: {constraint_expression} {constraint.sense} {constraint.coefficient}\n"
res += "\nBounds:\n"
for var in self.variables:
bounds = f"{var.lower_bound} <= {var.name} <= {var.upper_bound}"
res += bounds + "\n"
return res
[docs]
def get_coefficients_data(self) -> Tuple[np.ndarray, csc_matrix, np.ndarray]:
"""
Returns the coefficient matrix
:return:
"""
# Initialize lists to hold the row indices, column indices, and data for the A matrix
row_indices = []
col_indices = []
data = []
n = len(self.constraints)
lower = np.empty(n)
upper = np.empty(n)
var_idx_dict = {v: i for i, v in enumerate(self.variables)}
# Iterate through each constraint, populating the lists
for i, constraint in enumerate(self.constraints):
lower[i], upper[i] = constraint.get_bounds()
constraint.set_index(i)
for var, coeff in constraint.linear_expression.terms.items():
if var is not None: # Skip if it's the constant term
# Row index is the constraint number
row_indices.append(i)
# Column index is the variable's index
col_indices.append(var_idx_dict[var])
# Coefficient is the data
data.append(coeff)
# Create the A matrix in CSC format
A_csc = csc_matrix((data, (row_indices, col_indices)),
shape=(len(self.constraints), len(self.variables)))
return lower, A_csc, upper
[docs]
def get_var_data(self) -> Tuple[Vec, Vec, Vec, List[int]]:
"""
Get arrays related to the variable bounds and the objective function coefficients
:return: lower bounds, f obj coefficients, upper bounds, list of integer vars' indices
"""
n = len(self.variables)
lower = np.empty(n)
coeff = np.empty(n)
upper = np.empty(n)
is_int = list()
for i, var in enumerate(self.variables):
lower[i] = var.lower_bound
coeff[i] = self.get_obj_coefficient(var)
upper[i] = var.upper_bound
var.set_index(i)
if var.is_integer:
is_int.append(i)
return lower, coeff, upper, is_int
[docs]
def is_optimal(self) -> bool:
"""
:return:
"""
return self._is_optimal
def _solve(self, model: "LpModel", verbose: int = 0):
if self.solver_type == MIPSolvers.HIGHS:
solve_with_highs(problem=model, verbose=verbose)
else:
raise Exception(f"Unsupported solver {self.solver_type.value}")
[docs]
def solve(self, robust=True, show_logs=False, progress_text: Callable[[str], None] | None = None) -> int:
"""
Solve the model
:param robust: Relax the problem if infeasible
:param show_logs: Show logs
:param progress_text: Function pointer to print the status
:return: integer value matching OPTIMAL or not
"""
self._solve(model=self, verbose=int(show_logs))
if not self.is_optimal():
self.originally_infeasible = True
if robust:
"""
We are going to create a deep clone of the model,
add a slack variable to each constraint and minimize
the sum of the newly added slack vars.
This LP model will be always optimal.
After the solution, we inspect the slack vars added
if any of those is > 0, then the constraint where it
was added needs "slacking", therefore we add that slack
to the original model, and add the slack to the original
objective function. This way we relax the model while
bringing it to optimality.
"""
# deep copy of the original model
debug_model = self.copy()
# modify the original to detect the bad constraints
slacks = list()
debugging_f_obj = LpExp()
for i, cst in enumerate(debug_model.constraints):
# create a new slack var in the problem
sl = debug_model.add_var(lb=0, ub=1e20, name=f'Slk_{i}_{cst.name}')
# add the variable to the new objective function
debugging_f_obj += sl
# add the variable to the current constraint
cst.add_var(sl)
# store for later
slacks.append(sl)
# set the objective function as the summation of the new slacks
debug_model.minimize(debugging_f_obj)
# solve the debug model
self._solve(debug_model, verbose=int(show_logs))
debug_optimal = debug_model.is_optimal()
# clear the relaxed slacks list
self.relaxed_slacks: List[Tuple[int, LpVar, float]] = []
if debug_optimal:
# pick the original objectve function
main_f = self.objective
for i, sl in enumerate(slacks):
# get the debugging slack value
val = debug_model.get_value(sl)
if abs(val) > 1e-10:
cst_name = self.constraints[i].name
# add the slack in the main model
sl2 = self.add_var(lb=0, ub=1e20, name=f'Slk_rlx_{i}_{cst_name}')
self.relaxed_slacks.append((i, sl2, 0.0)) # the 0.0 value will be read later
# add the slack to the original objective function
main_f += sl2
# alter the matching constraint
self.constraints[i].add_var(sl2)
# set the modified (original) objective function
self.minimize(main_f)
# solve the modified (original) model
self._solve(self, verbose=int(show_logs))
if self.is_optimal():
for i in range(len(self.relaxed_slacks)):
k, var, _ = self.relaxed_slacks[i]
val = self.get_value(var)
self.relaxed_slacks[i] = (k, var, val)
# logg this
if abs(val) > 1e-10:
self.logger.add_warning("Relaxed problem",
device=self.constraints[i].name,
value=val)
else:
self.logger.add_warning("Unable to relax the model, the debug model failed")
else:
pass
return self.OPTIMAL if self.is_optimal() else 0
[docs]
def status2string(self, status: int) -> str:
if status == self.OPTIMAL:
return "Optimal"
elif status == self.INFINITY:
return "Infinity"
else:
return "infeasible"
[docs]
def set_solution(self, col_values: Vec, col_duals: Vec,
row_values: Vec, row_duals: Vec,
f_obj: float, is_optimal: bool):
"""
Set solution from the MIP solver
:param col_values: array of variables' values
:param col_duals: array of variable dual values
:param row_values: array of constraint values
:param row_duals: array of constraint dual values
:param f_obj: value of the objective function
:param is_optimal: is optimal
"""
self._col_value = col_values
self._col_dual = col_duals
self._row_value = row_values
self._row_dual = row_duals
self._objective_value = f_obj
self._is_optimal = is_optimal
[docs]
def fobj_value(self) -> float:
"""
Get the objective function value
:return:
"""
return self._objective_value
[docs]
def is_mip(self):
"""
Is this model a MIP?
:return:
"""
return self._is_mip
[docs]
def get_objective_value(self) -> float:
"""
Get the objective function value
:return: float
"""
return self._objective_value
[docs]
def get_value(self, var: LpVar | LpCst | LpExp | float | int) -> float:
"""
Get the value of a variable
:param var: LpVar object
:return: float
"""
if isinstance(var, LpVar):
return self._col_value[var.get_index()]
elif isinstance(var, LpCst):
return self._row_value[var.get_index()]
elif isinstance(var, LpExp):
val = var.offset
for var2, _coef in var.terms.items():
val += _coef * self._col_value[var2.get_index()]
return val
elif isinstance(var, int) or isinstance(var, float):
return var
else:
raise TypeError("Unsupported variable type {0}".format(type(var)))
[docs]
def get_dual_value(self, var: LpVar | LpCst | LpExp | float | int) -> float:
"""
Get the dual value of a variable
:param var: LpVar object
:return: float
"""
if isinstance(var, LpVar):
return self._col_dual[var.get_index()]
elif isinstance(var, LpCst):
return self._row_dual[var.get_index()]
elif isinstance(var, LpExp):
val = var.offset
for var2, coeff in var.terms.items():
val += coeff * self._col_dual[var2.get_index()]
return val
elif isinstance(var, int) or isinstance(var, float):
return var
else:
raise TypeError("Unsupported variable type {0}".format(type(var)))
[docs]
def get_array_value(self, arr: Union[List[LpVar]]) -> np.ndarray:
"""
Get the array of var values
:param arr: array of variables
:return:
"""
res = np.empty(len(arr))
for i, var in enumerate(arr):
res[i] = self.get_value(var)
return res
[docs]
def solution_available(self) -> bool:
"""
Is there a solution loaded?
:return: true / false
"""
return len(self._col_value) == len(self.variables)
[docs]
def print_solution(self):
"""
Print available solution
"""
if self.solution_available():
print("Solution")
for var in self.variables:
print(f"{var.name}: {self._col_value[var.get_index()]}")
else:
print("No solution available, solve first")