Source code for VeraGridEngine.Utils.MIP.pulp_interface

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0

"""
This module abstracts the synthax of PuLP out
so that in the future it can be exchanged with some
other solver interface easily
"""
from __future__ import annotations

from typing import List, Union, Callable, Any
import subprocess
import pulp
from pulp import LpVariable as LpVar, LpConstraint as LpCst, LpAffineExpression as LpExp
from pulp import HiGHS, CPLEX_CMD
from pulp import LpContinuous, LpInteger
from VeraGridEngine.enumerations import MIPSolvers
from VeraGridEngine.basic_structures import Logger
from VeraGridEngine.Utils.MIP.mip_interface_template import AbstractLpModel


[docs] def get_lp_var_value(x: Union[float, LpVar]) -> float: """ Get the value of a variable stored in a numpy array of objects :param x: soe object (it may be a LP var or a number) :return: result or previous numeric value """ if isinstance(x, LpVar): return x.value() elif isinstance(x, LpExp): return x.value() elif isinstance(x, LpCst): return x.pi else: return x
[docs] def get_pulp_available_mip_solvers() -> List[str]: """ Get a list of candidate solvers :return: """ solvers = pulp.listSolvers(onlyAvailable=True) solvers2 = list() for slv in solvers: if slv == 'SCIP_CMD': solvers2.append(MIPSolvers.SCIP.value) elif slv == 'CPLEX_CMD': solvers2.append(MIPSolvers.CPLEX.value) elif slv == 'GUROBI': solvers2.append(MIPSolvers.GUROBI.value) elif slv == 'XPRESS': solvers2.append(MIPSolvers.XPRESS.value) elif slv == 'HiGHS': solvers2.append(MIPSolvers.HIGHS.value) return solvers2
[docs] class PulpLpModel(AbstractLpModel): """ LPModel implementation for PuLP """ __slots__ = ("model",) OPTIMAL = pulp.LpStatusOptimal INFINITY = 1e20 def __init__(self, solver_type: MIPSolvers): """ :param solver_type: """ super().__init__(solver_type, name="PuLP") self.solver_type: MIPSolvers = solver_type self.model = pulp.LpProblem("myProblem", pulp.LpMinimize) self.relaxed_slacks = list() self.logger = Logger() if self.model is None: raise Exception("{} is not present".format(solver_type.value))
[docs] @staticmethod def set_var_bounds(var: LpVar, lb: float, ub: float): """ Modify the bounds of a variable :param var: LpVar instance to modify :param lb: lower bound value :param ub: upper bound value """ if isinstance(var, LpVar): var.upBound = ub var.lowBound = lb
[docs] def save_model(self, file_name: str = "ntc_opf_problem.lp") -> None: """ Save problem in LP format :param file_name: name of the file (.lp or .mps supported) """ # save the problem in LP format to debug if file_name.lower().endswith('.lp'): lp_content = self.model.writeLP(filename=file_name, max_length=10000) elif file_name.lower().endswith('.mps'): lp_content = self.model.writeMPS(filename=file_name) else: raise Exception('Unsupported file format')
[docs] def add_int(self, lb: int, ub: int, name: str = "") -> LpVar: """ Make integer LP var :param lb: lower bound :param ub: upper bound :param name: name (optional) :return: LpVar """ var = pulp.LpVariable(name=name, lowBound=lb, upBound=ub, cat=pulp.LpInteger) self.model.addVariable(var) return var
[docs] def add_bin(self, name: str = "") -> LpVar: """ Make integer LP var :param name: name (optional) :return: LpVar """ var = pulp.LpVariable(name=name, lowBound=0, upBound=1, cat=pulp.LpInteger) self.model.addVariable(var) return var
[docs] def add_var(self, lb: float, ub: float, name: str = "") -> LpVar: """ Make floating point LP var :param lb: lower bound :param ub: upper bound :param name: name (optional) :return: LpVar """ var = pulp.LpVariable(name=name, lowBound=lb, upBound=ub, cat=pulp.LpContinuous) self.model.addVariable(var) return var
[docs] def add_cst(self, cst: LpCst | bool, name: str = "") -> Union[LpCst, int]: """ Add constraint to the model :param cst: constraint object (or general expression) :param name: name of the constraint (optional) :return: Constraint object """ if isinstance(cst, bool): return 0 else: self.model.addConstraint(constraint=cst, name=name) return cst
[docs] @staticmethod def sum(cst) -> LpExp: """ Add sum of the constraints to the model :param cst: constraint object (or general expression) :return: Constraint object """ return pulp.lpSum(cst)
[docs] def minimize(self, obj_function: LpExp): """ Set the objective function with minimization sense :param obj_function: expression to minimize """ self.model.setObjective(obj=obj_function)
[docs] def get_solver(self, show_logs: bool = False): """ :param show_logs: :return: """ if self.solver_type == MIPSolvers.HIGHS: return HiGHS(mip=self.model.isMIP(), msg=show_logs) elif self.solver_type == MIPSolvers.SCIP: return pulp.getSolver('SCIP_CMD') elif self.solver_type == MIPSolvers.CPLEX: return CPLEX_CMD(mip=self.model.isMIP(), msg=show_logs) elif self.solver_type == MIPSolvers.GUROBI: return pulp.getSolver('GUROBI') elif self.solver_type == MIPSolvers.XPRESS: return pulp.getSolver('XPRESS') else: raise Exception('PuLP Unsupported MIP solver ' + self.solver_type.value)
[docs] def solve(self, robust: bool = False, show_logs: bool = False, progress_text: Callable[[str], None] | None = None) -> int: """ Solve the model :param robust: In this interface, this is useless :param show_logs: In this interface, this is useless :param progress_text: progress function pointer :return: """ if progress_text is not None: progress_text(f"Solving model with {self.solver_type.value}...") # solve the model try: status = self.model.solve(solver=self.get_solver(show_logs=show_logs)) except pulp.PulpSolverError as e: self.logger.add_error(msg=str(e), ) # Retry with Highs status = self.model.solve(solver=HiGHS(mip=self.model.isMIP(), msg=show_logs)) except subprocess.CalledProcessError as e: self.logger.add_error(msg=str(e), ) # Retry with Highs status = self.model.solve(solver=HiGHS(mip=self.model.isMIP(), msg=show_logs)) except IndexError as e: print("Index error:") print(e) status = 1 if status != self.OPTIMAL: self.originally_infeasible = True if robust: """ We are going to create a deep clone of the model, add a slack variable to each constraint and minimize the sum of the newly added slack vars. This LP model will be always optimal. After the solution, we inspect the slack vars added if any of those is > 0, then the constraint where it was added needs "slacking", therefore we add that slack to the original model, and add the slack to the original objective function. This way we relax the model while bringing it to optimality. """ self.logger.add_error(msg="Base problem could not be solved", value=self.status2string(status)) # deep copy of the original model debug_model = self.model.deepcopy() # modify the original to detect the bad constraints slacks = list() debugging_f_obj = 0 for i, (cst_name, cst) in enumerate(debug_model.constraints.items()): # create a new slack var in the problem sl = pulp.LpVariable(name=f'Relax_{cst_name}', lowBound=0, upBound=1e20, cat=pulp.LpContinuous) debug_model.addVariable(sl) # add the variable to the new objective function debugging_f_obj += sl # add the variable to the current constraint if cst.sense == pulp.LpConstraintLE: cst += sl elif cst.sense == pulp.LpConstraintGE: cst -= sl else: # equality cst += sl # store for later slacks.append((cst_name, sl)) # set the objective function as the summation of the new slacks debug_model.setObjective(debugging_f_obj) if progress_text is not None: progress_text(f"Solving debug model with {self.solver_type.value}...") # solve the debug model status_d = debug_model.solve(solver=self.get_solver(show_logs=show_logs)) # clear the relaxed slacks list self.relaxed_slacks = list() if status_d == PulpLpModel.OPTIMAL: # pick the original objective function cst_slack_map = list() for i, (cst_name, sl) in enumerate(slacks): # get the debugging slack value val = sl.value() if abs(val) > 1e-10: # add the slack in the main model sl2 = pulp.LpVariable(name=f'Relax_final_{cst_name}', lowBound=0, upBound=1e20, cat=pulp.LpContinuous) self.model.addVariable(sl2) self.relaxed_slacks.append((i, sl2, 0.0)) # the 0.0 value will be read later # add the slack to the original objective function self.model.objective += sl2 # alter the matching constraint self.model.constraints[cst_name] += sl2 # register the relation for later cst_slack_map.append(cst_name) # set the modified (original) objective function self.model.setObjective(self.model.objective) # at this point we can delete_with_dialogue the debug model del debug_model if progress_text is not None: progress_text(f"Solving relaxed model with {self.solver_type.value}...") # solve the modified (original) model status = self.model.solve(solver=self.get_solver(show_logs=show_logs)) if status == PulpLpModel.OPTIMAL: for i in range(len(self.relaxed_slacks)): k, var, _ = self.relaxed_slacks[i] val = var.value() self.relaxed_slacks[i] = (k, var, val) # logg this if abs(val) > 1e-10: self.logger.add_warning( msg="Relaxed problem", device=self.model.constraints[cst_slack_map[i]].name, value=val ) else: self.logger.add_warning(msg="Relaxed probrem is not optimal :(") else: self.logger.add_warning("Unable to relax the model, the debug model failed :(") return status
[docs] def fobj_value(self) -> float: """ Get the objective function value :return: """ return self.model.objective.value()
[docs] def is_mip(self): """ Is this odel a MIP? :return: """ return self.model.isMIP()
[docs] @staticmethod def get_value(x: Union[float, int, LpVar, LpExp, LpCst, Any]) -> float: """ Get the value of a variable stored in a numpy array of objects :param x: solver object (it may be a LP var or a number) :return: result or zero """ if isinstance(x, LpVar): val = x.value() elif isinstance(x, LpExp): val = x.value() elif isinstance(x, LpCst): val = x.values() elif isinstance(x, float) or isinstance(x, int): return x else: raise Exception("Unrecognized type {}".format(x)) if isinstance(val, float): return val else: return 0.0
[docs] @staticmethod def get_dual_value(x: LpCst) -> float: """ Get the dual value of a variable stored in a numpy array of objects :param x: constraint :return: result or zero """ if x is None: return 0.0 if isinstance(x, LpCst): return x.pi elif isinstance(x, float): return x else: return 0.0
[docs] def status2string(self, stat: int) -> str: """ Convert the PuLP status to a string :param stat: :return: """ return pulp.LpStatus[stat]
[docs] def model_as_string(self) -> str: """ Return the LP representation :return: string """ lp = self.model max_length = 1000000 mip = True writeSOS = True f = "" f += "\\* " + lp.name + " *\\\n" if lp.sense == 1: f += "Minimize\n" else: f += "Maximize\n" wasNone, objectiveDummyVar = lp.fixObjective() assert lp.objective is not None objName = lp.objective.name if not objName: objName = "OBJ" f += lp.objective.asCplexLpAffineExpression(objName, include_constant=False) f += "Subject To\n" ks = list(lp.constraints.keys()) ks.sort() dummyWritten = False for k in ks: constraint = lp.constraints[k] if not list(constraint.keys()): # empty constraint add the dummyVar dummyVar = lp.get_dummyVar() constraint += dummyVar # set this dummyvar to zero so infeasible problems are not made feasible if not dummyWritten: f += (dummyVar == 0.0).asCplexLpConstraint("_dummy") dummyWritten = True f += constraint.asCplexLpConstraint(k) # check if any names are longer than 100 characters lp.checkLengthVars(max_length) vs = lp.variables() # check for repeated names lp.checkDuplicateVars() # Bounds on non-"positive" variables # Note: XPRESS and CPLEX do not interpret integer variables without # explicit bounds if mip: vg = [ v for v in vs if not (v.isPositive() and v.cat == LpContinuous) and not v.isBinary() ] else: vg = [v for v in vs if not v.isPositive()] if vg: f += "Bounds\n" for v in vg: f += f" {v.asCplexLpVariable()}\n" # Integer non-binary variables if mip: vg = [v for v in vs if v.cat == LpInteger and not v.isBinary()] if vg: f += "Generals\n" for v in vg: f += f"{v.name}\n" # Binary variables vg = [v for v in vs if v.isBinary()] if vg: f += "Binaries\n" for v in vg: f += f"{v.name}\n" # Special Ordered Sets if writeSOS and (lp.sos1 or lp.sos2): f += "SOS\n" if lp.sos1: for sos in lp.sos1.values(): f += "S1:: \n" for v, val in sos.items(): f += f" {v.name}: {val:.12g}\n" if lp.sos2: for sos in lp.sos2.values(): f += "S2:: \n" for v, val in sos.items(): f += f" {v.name}: {val:.12g}\n" f += "End\n" lp.restoreObjective(wasNone, objectiveDummyVar) return f