Source code for pyomo.contrib.solver.gurobi_direct

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import datetime
import io
import math
import operator
import os

from pyomo.common.config import ConfigValue
from pyomo.common.collections import ComponentMap, ComponentSet
from pyomo.common.dependencies import attempt_import
from pyomo.common.enums import ObjectiveSense
from pyomo.common.errors import MouseTrap
from pyomo.common.shutdown import python_is_shutting_down
from pyomo.common.tee import capture_output, TeeStream
from pyomo.common.timing import HierarchicalTimer

from pyomo.contrib.solver.base import SolverBase
from pyomo.contrib.solver.config import BranchAndBoundConfig
from pyomo.contrib.solver.results import Results, SolutionStatus, TerminationCondition
from pyomo.contrib.solver.solution import SolutionLoaderBase

from pyomo.core.staleflag import StaleFlagManager

from pyomo.repn.plugins.standard_form import LinearStandardFormCompiler

gurobipy, gurobipy_available = attempt_import('gurobipy')


[docs] class GurobiConfig(BranchAndBoundConfig):
[docs] def __init__( self, description=None, doc=None, implicit=False, implicit_domain=None, visibility=0, ): super(GurobiConfig, self).__init__( description=description, doc=doc, implicit=implicit, implicit_domain=implicit_domain, visibility=visibility, ) self.use_mipstart: bool = self.declare( 'use_mipstart', ConfigValue( default=False, domain=bool, description="If True, the current values of the integer variables " "will be passed to Gurobi.", ), )
[docs] class GurobiDirectSolutionLoader(SolutionLoaderBase):
[docs] def __init__(self, grb_model, grb_cons, grb_vars, pyo_cons, pyo_vars, pyo_obj): self._grb_model = grb_model self._grb_cons = grb_cons self._grb_vars = grb_vars self._pyo_cons = pyo_cons self._pyo_vars = pyo_vars self._pyo_obj = pyo_obj GurobiDirect._num_instances += 1
def __del__(self): if python_is_shutting_down(): return # Free the associated model if self._grb_model is not None: self._grb_cons = None self._grb_vars = None self._pyo_cons = None self._pyo_vars = None self._pyo_obj = None # explicitly release the model self._grb_model.dispose() self._grb_model = None # Release the gurobi license if this is the last reference to # the environment (either through a results object or solver # interface) GurobiDirect._num_instances -= 1 if GurobiDirect._num_instances == 0: GurobiDirect.release_license()
[docs] def load_vars(self, vars_to_load=None, solution_number=0): assert solution_number == 0 if self._grb_model.SolCount == 0: raise RuntimeError( 'Solver does not currently have a valid solution. Please ' 'check the termination condition.' ) iterator = zip(self._pyo_vars, self._grb_vars.x.tolist()) if vars_to_load: vars_to_load = ComponentSet(vars_to_load) iterator = filter(lambda var_val: var_val[0] in vars_to_load, iterator) for p_var, g_var in iterator: p_var.set_value(g_var, skip_validation=True) StaleFlagManager.mark_all_as_stale(delayed=True)
[docs] def get_primals(self, vars_to_load=None, solution_number=0): assert solution_number == 0 if self._grb_model.SolCount == 0: raise RuntimeError( 'Solver does not currently have a valid solution. Please ' 'check the termination condition.' ) iterator = zip(self._pyo_vars, self._grb_vars.x.tolist()) if vars_to_load: vars_to_load = ComponentSet(vars_to_load) iterator = filter(lambda var_val: var_val[0] in vars_to_load, iterator) return ComponentMap(iterator)
[docs] def get_duals(self, cons_to_load=None): if self._grb_model.Status != gurobipy.GRB.OPTIMAL: raise RuntimeError( 'Solver does not currently have valid duals. Please ' 'check the termination condition.' ) def dedup(_iter): last = None for con_info_dual in _iter: if not con_info_dual[1] and con_info_dual[0][0] is last: continue last = con_info_dual[0][0] yield con_info_dual iterator = dedup(zip(self._pyo_cons, self._grb_cons.getAttr('Pi').tolist())) if cons_to_load: cons_to_load = set(cons_to_load) iterator = filter( lambda con_info_dual: con_info_dual[0][0] in cons_to_load, iterator ) return {con_info[0]: dual for con_info, dual in iterator}
[docs] def get_reduced_costs(self, vars_to_load=None): if self._grb_model.Status != gurobipy.GRB.OPTIMAL: raise RuntimeError( 'Solver does not currently have valid reduced costs. Please ' 'check the termination condition.' ) iterator = zip(self._pyo_vars, self._grb_vars.getAttr('Rc').tolist()) if vars_to_load: vars_to_load = ComponentSet(vars_to_load) iterator = filter(lambda var_rc: var_rc[0] in vars_to_load, iterator) return ComponentMap(iterator)
[docs] class GurobiDirect(SolverBase): CONFIG = GurobiConfig() _available = None _num_instances = 0 _tc_map = None
[docs] def __init__(self, **kwds): super().__init__(**kwds) GurobiDirect._num_instances += 1
[docs] def available(self): if not gurobipy_available: # this triggers the deferred import return self.Availability.NotFound elif self._available == self.Availability.BadVersion: return self.Availability.BadVersion else: return self._check_license()
def _check_license(self): avail = False try: # Gurobipy writes out license file information when creating # the environment with capture_output(capture_fd=True): m = gurobipy.Model() avail = True except gurobipy.GurobiError: avail = False if avail: if self._available is None: self._available = GurobiDirect._check_full_license(m) return self._available else: return self.Availability.BadLicense @classmethod def _check_full_license(cls, model=None): if model is None: model = gurobipy.Model() model.setParam('OutputFlag', 0) try: model.addVars(range(2001)) model.optimize() return cls.Availability.FullLicense except gurobipy.GurobiError: return cls.Availability.LimitedLicense def __del__(self): if not python_is_shutting_down(): GurobiDirect._num_instances -= 1 if GurobiDirect._num_instances == 0: self.release_license() @staticmethod def release_license(): if gurobipy_available: with capture_output(capture_fd=True): gurobipy.disposeDefaultEnv()
[docs] def version(self): version = ( gurobipy.GRB.VERSION_MAJOR, gurobipy.GRB.VERSION_MINOR, gurobipy.GRB.VERSION_TECHNICAL, ) return version
[docs] def solve(self, model, **kwds) -> Results: start_timestamp = datetime.datetime.now(datetime.timezone.utc) config = self.config(value=kwds, preserve_implicit=True) if config.timer is None: config.timer = HierarchicalTimer() timer = config.timer StaleFlagManager.mark_all_as_stale() timer.start('compile_model') repn = LinearStandardFormCompiler().write( model, mixed_form=True, set_sense=None ) timer.stop('compile_model') if len(repn.objectives) > 1: raise ValueError( f"The {self.__class__.__name__} solver only supports models " f"with zero or one objectives (received {len(repn.objectives)})." ) timer.start('prepare_matrices') inf = float('inf') ninf = -inf bounds = list(map(operator.attrgetter('bounds'), repn.columns)) lb = [ninf if _b is None else _b for _b in map(operator.itemgetter(0), bounds)] ub = [inf if _b is None else _b for _b in map(operator.itemgetter(1), bounds)] CON = gurobipy.GRB.CONTINUOUS BIN = gurobipy.GRB.BINARY INT = gurobipy.GRB.INTEGER vtype = [ ( CON if v.is_continuous() else BIN if v.is_binary() else INT if v.is_integer() else '?' ) for v in repn.columns ] sense_type = list('=<>') # Note: ordering matches 0, 1, -1 sense = [sense_type[r[1]] for r in repn.rows] timer.stop('prepare_matrices') ostreams = [io.StringIO()] + config.tee res = Results() try: orig_cwd = os.getcwd() if config.working_dir: os.chdir(config.working_dir) with TeeStream(*ostreams) as t, capture_output(t.STDOUT, capture_fd=False): gurobi_model = gurobipy.Model() timer.start('transfer_model') x = gurobi_model.addMVar( len(repn.columns), lb=lb, ub=ub, obj=repn.c.todense()[0] if repn.c.shape[0] else 0, vtype=vtype, ) A = gurobi_model.addMConstr(repn.A, x, sense, repn.rhs) if repn.c.shape[0]: gurobi_model.setAttr('ObjCon', repn.c_offset[0]) gurobi_model.setAttr('ModelSense', int(repn.objectives[0].sense)) # Note: calling gurobi_model.update() here is not # necessary (it will happen as part of optimize()): # gurobi_model.update() timer.stop('transfer_model') options = config.solver_options gurobi_model.setParam('LogToConsole', 1) if config.threads is not None: gurobi_model.setParam('Threads', config.threads) if config.time_limit is not None: gurobi_model.setParam('TimeLimit', config.time_limit) if config.rel_gap is not None: gurobi_model.setParam('MIPGap', config.rel_gap) if config.abs_gap is not None: gurobi_model.setParam('MIPGapAbs', config.abs_gap) if config.use_mipstart: raise MouseTrap("MIPSTART not yet supported") for key, option in options.items(): gurobi_model.setParam(key, option) timer.start('optimize') gurobi_model.optimize() timer.stop('optimize') finally: os.chdir(orig_cwd) res = self._postsolve( timer, config, GurobiDirectSolutionLoader( gurobi_model, A, x, repn.rows, repn.columns, repn.objectives ), ) res.solver_configuration = config res.solver_name = 'Gurobi' res.solver_version = self.version() res.solver_log = ostreams[0].getvalue() end_timestamp = datetime.datetime.now(datetime.timezone.utc) res.timing_info.start_timestamp = start_timestamp res.timing_info.wall_time = (end_timestamp - start_timestamp).total_seconds() res.timing_info.timer = timer return res
def _postsolve(self, timer: HierarchicalTimer, config, loader): grb_model = loader._grb_model status = grb_model.Status results = Results() results.solution_loader = loader results.timing_info.gurobi_time = grb_model.Runtime if grb_model.SolCount > 0: if status == gurobipy.GRB.OPTIMAL: results.solution_status = SolutionStatus.optimal else: results.solution_status = SolutionStatus.feasible else: results.solution_status = SolutionStatus.noSolution results.termination_condition = self._get_tc_map().get( status, TerminationCondition.unknown ) if ( results.termination_condition != TerminationCondition.convergenceCriteriaSatisfied and config.raise_exception_on_nonoptimal_result ): raise RuntimeError( 'Solver did not find the optimal solution. Set ' 'opt.config.raise_exception_on_nonoptimal_result=False ' 'to bypass this error.' ) if loader._pyo_obj: try: if math.isfinite(grb_model.ObjVal): results.incumbent_objective = grb_model.ObjVal else: results.incumbent_objective = None except (gurobipy.GurobiError, AttributeError): results.incumbent_objective = None try: results.objective_bound = grb_model.ObjBound except (gurobipy.GurobiError, AttributeError): if grb_model.ModelSense == ObjectiveSense.minimize: results.objective_bound = -math.inf else: results.objective_bound = math.inf else: results.incumbent_objective = None results.objective_bound = None results.iteration_count = grb_model.getAttr('IterCount') timer.start('load solution') if config.load_solutions: if grb_model.SolCount > 0: results.solution_loader.load_vars() else: raise RuntimeError( 'A feasible solution was not found, so no solution can be loaded.' 'Please set opt.config.load_solutions=False and check ' 'results.solution_status and ' 'results.incumbent_objective before loading a solution.' ) timer.stop('load solution') return results def _get_tc_map(self): if GurobiDirect._tc_map is None: grb = gurobipy.GRB tc = TerminationCondition GurobiDirect._tc_map = { grb.LOADED: tc.unknown, # problem is loaded, but no solution grb.OPTIMAL: tc.convergenceCriteriaSatisfied, grb.INFEASIBLE: tc.provenInfeasible, grb.INF_OR_UNBD: tc.infeasibleOrUnbounded, grb.UNBOUNDED: tc.unbounded, grb.CUTOFF: tc.objectiveLimit, grb.ITERATION_LIMIT: tc.iterationLimit, grb.NODE_LIMIT: tc.iterationLimit, grb.TIME_LIMIT: tc.maxTimeLimit, grb.SOLUTION_LIMIT: tc.unknown, grb.INTERRUPTED: tc.interrupted, grb.NUMERIC: tc.unknown, grb.SUBOPTIMAL: tc.unknown, grb.USER_OBJ_LIMIT: tc.objectiveLimit, } return GurobiDirect._tc_map