Source code for pyomo.solvers.plugins.solvers.GUROBI

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import io
import os
import sys
import re
import time
import logging
import subprocess

from pyomo.common import Executable
from pyomo.common.collections import Bunch
from pyomo.common.dependencies import attempt_import
from pyomo.common.enums import maximize, minimize
from pyomo.common.errors import ApplicationError
from pyomo.common.fileutils import this_file_dir
from pyomo.common.log import is_debug_set
from pyomo.common.tee import capture_output, TeeStream
from pyomo.common.tempfiles import TempfileManager

from pyomo.opt.base import ProblemFormat, ResultsFormat, OptSolver
from pyomo.opt.base.solvers import _extract_version, SolverFactory
from pyomo.opt.results import (
    SolverStatus,
    TerminationCondition,
    SolutionStatus,
    Solution,
)
from pyomo.opt.solver import ILMLicensedSystemCallSolver
from pyomo.core.kernel.block import IBlock
from pyomo.core import ConcreteModel, Var, Objective

from pyomo.solvers.plugins.solvers.gurobi_direct import gurobipy, gurobipy_available
from pyomo.solvers.plugins.solvers.ASL import ASL

logger = logging.getLogger('pyomo.solvers')
GUROBI_RUN = attempt_import('pyomo.solvers.plugins.solvers.GUROBI_RUN')[0]


[docs] @SolverFactory.register('gurobi', doc='The GUROBI LP/MIP solver') class GUROBI(OptSolver): """The GUROBI LP/MIP solver""" def __new__(cls, *args, **kwds): mode = kwds.pop('solver_io', 'lp') if mode is None: mode = 'lp' # if mode == 'lp': if gurobipy_available: return SolverFactory('_gurobi_file', **kwds) else: return SolverFactory('_gurobi_shell', **kwds) if mode == 'mps': if gurobipy_available: opt = SolverFactory('_gurobi_file', **kwds) else: opt = SolverFactory('_gurobi_shell', **kwds) opt.set_problem_format(ProblemFormat.mps) return opt if mode in ['python', 'direct']: opt = SolverFactory('gurobi_direct', **kwds) if opt is None: logger.error('Python API for GUROBI is not installed') return return opt if mode == 'persistent': opt = SolverFactory('gurobi_persistent', **kwds) if opt is None: logger.error('Python API for GUROBI is not installed') return return opt # if mode == 'os': opt = SolverFactory('_ossolver', **kwds) elif mode == 'nl': opt = SolverFactory('_gurobi_nl', **kwds) else: logger.error('Unknown IO type: %s' % mode) return opt.set_options('solver=gurobi_ampl') return opt
[docs] @SolverFactory.register('_gurobi_nl', doc='NL interface to the Gurobi solver') class GUROBINL(ASL): """NL interface to gurobi_ampl."""
[docs] def license_is_valid(self): m = ConcreteModel() m.x = Var(bounds=(1, 2)) m.obj = Objective(expr=m.x) try: with capture_output(): self.solve(m) return abs(m.x.value - 1) <= 1e-4 except: return False
[docs] @SolverFactory.register( '_gurobi_shell', doc='Shell interface to the GUROBI LP/MIP solver' ) class GUROBISHELL(ILMLicensedSystemCallSolver): """Shell interface to the GUROBI LP/MIP solver""" _solver_info_cache = {}
[docs] def __init__(self, **kwds): # # Call base class constructor # kwds['type'] = 'gurobi' ILMLicensedSystemCallSolver.__init__(self, **kwds) # NOTE: eventually both of the following attributes should be # migrated to a common base class. is the current solve # warm-started? a transient data member to communicate state # information across the _presolve, _apply_solver, and # _postsolve methods. self._warm_start_solve = False # related to the above, the temporary name of the MST warm-start # file (if any). self._warm_start_file_name = None # # Define valid problem formats and associated results formats # self._valid_problem_formats = [ProblemFormat.cpxlp, ProblemFormat.mps] self._valid_result_formats = {} self._valid_result_formats[ProblemFormat.cpxlp] = [ResultsFormat.soln] self._valid_result_formats[ProblemFormat.mps] = [ResultsFormat.soln] self.set_problem_format(ProblemFormat.cpxlp) # Note: Undefined capabilities default to 'None' self._capabilities = Bunch() self._capabilities.linear = True self._capabilities.quadratic_objective = True self._capabilities.quadratic_constraint = True self._capabilities.integer = True self._capabilities.sos1 = True self._capabilities.sos2 = True
[docs] def license_is_valid(self): """ Runs a check for a valid Gurobi license using the given executable (default is 'gurobi_cl'). All output is hidden. If the test fails for any reason (including the executable being invalid), then this function will return False. """ solver_exec = self.executable() if (solver_exec, 'licensed') in self._solver_info_cache: return self._solver_info_cache[(solver_exec, 'licensed')] if not solver_exec: licensed = False else: executable = os.path.join(os.path.dirname(solver_exec), 'gurobi_cl') try: rc = subprocess.call( [executable, "--license"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) except OSError: try: rc = subprocess.run( [solver_exec], input=('import gurobipy; gurobipy.Env().dispose(); quit()'), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, ).returncode except OSError: rc = 1 licensed = not rc self._solver_info_cache[(solver_exec, 'licensed')] = licensed return licensed
def _default_results_format(self, prob_format): return ResultsFormat.soln
[docs] def warm_start_capable(self): return True
# # write a warm-start file in the GUROBI MST format, which is *not* # the same as the CPLEX MST format. # def _warm_start(self, instance): from pyomo.core.base import Var # for each variable in the symbol_map, add a child to the # variables element. Both continuous and discrete are accepted # (and required, depending on other options). # # **Note**: This assumes that the symbol_map is "clean", i.e., # contains only references to the variables encountered in # constraints output_index = 0 if isinstance(instance, IBlock): smap = getattr(instance, "._symbol_maps")[self._smap_id] else: smap = instance.solutions.symbol_map[self._smap_id] byObject = smap.byObject with open(self._warm_start_file_name, 'w') as mst_file: for vdata in instance.component_data_objects(Var): if (vdata.value is not None) and (id(vdata) in byObject): name = byObject[id(vdata)] mst_file.write("%s %s\n" % (name, vdata.value)) # over-ride presolve to extract the warm-start keyword, if specified. def _presolve(self, *args, **kwds): # create a context in the temporary file manager for # this plugin - is "pop"ed in the _postsolve method. TempfileManager.push() # if the first argument is a string (representing a filename), # then we don't have an instance => the solver is being applied # to a file. self._warm_start_solve = kwds.pop('warmstart', False) self._warm_start_file_name = kwds.pop('warmstart_file', None) user_warmstart = False if self._warm_start_file_name is not None: user_warmstart = True # the input argument can currently be one of two things: an # instance or a filename. if a filename is provided and a # warm-start is indicated, we go ahead and create the temporary # file - assuming that the user has already, via some external # mechanism, invoked warm_start() with a instance to create the # warm start file. if self._warm_start_solve and isinstance(args[0], str): # we assume the user knows what they are doing... pass elif self._warm_start_solve and (not isinstance(args[0], str)): # assign the name of the warm start file *before* calling # the base class presolve - the base class method ends up # creating the command line, and the warm start file-name is # (obviously) needed there. if self._warm_start_file_name is None: assert not user_warmstart self._warm_start_file_name = TempfileManager.create_tempfile( suffix='.gurobi.mst' ) # let the base class handle any remaining keywords/actions. ILMLicensedSystemCallSolver._presolve(self, *args, **kwds) # NB: we must let the base class presolve run first so that the # symbol_map is actually constructed! if (len(args) > 0) and (not isinstance(args[0], str)): if len(args) != 1: raise ValueError( "GUROBI _presolve method can only handle a single " "problem instance - %s were supplied" % (len(args),) ) # write the warm-start file - currently only supports MIPs. # we only know how to deal with a single problem instance. if self._warm_start_solve and (not user_warmstart): start_time = time.time() self._warm_start(args[0]) end_time = time.time() if self._report_timing is True: print( "Warm start write time=%.2f seconds" % (end_time - start_time) ) def _default_executable(self): if sys.platform == 'win32': executable = Executable("gurobi.bat") else: executable = Executable("gurobi.sh") if executable: return executable.path() if gurobipy_available: return sys.executable logger.warning( "Could not locate the 'gurobi' executable, " "which is required for solver %s" % self.name ) self.enable = False return None def _get_version(self): """ Returns a tuple describing the solver executable version. """ solver_exec = self.executable() if (solver_exec, 'version') in self._solver_info_cache: return self._solver_info_cache[(solver_exec, 'version')] if solver_exec is None: ver = _extract_version('') else: results = subprocess.run( [solver_exec], input=('import gurobipy; print(gurobipy.gurobi.version()); quit()'), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, ) ver = None try: ver = tuple(eval(results.stdout.strip())) while len(ver) < 4: ver += (0,) except SyntaxError: ver = _extract_version('') if ver is not None: ver = ver[:4] self._solver_info_cache[(solver_exec, 'version')] = ver return ver
[docs] def create_command_line(self, executable, problem_files): # # Define log file # if self._log_file is None: self._log_file = TempfileManager.create_tempfile(suffix='.gurobi.log') # # Define solution file # As indicated above, contains (in XML) both the solution and # solver status. # if self._soln_file is None: self._soln_file = TempfileManager.create_tempfile(suffix='.gurobi.txt') # # Write the GUROBI execution script # problem_filename = self._problem_files[0] solution_filename = self._soln_file warmstart_filename = self._warm_start_file_name # translate the options into a normal python dictionary, from a # pyomo.common.collections.Bunch - the gurobi_run function # doesn't know about pyomo, so the translation is necessary # (`repr(options)` doesn't produce executable python code) options_dict = dict(self.options) # NOTE: the gurobi shell is independent of Pyomo python # virtualized environment, so any imports - specifically # that required to get GUROBI_RUN - must be handled # explicitly. # NOTE: The gurobi plugin (GUROBI.py) and GUROBI_RUN.py live in # the same directory. script = "import sys\n" script += "sys.path.append(%r)\n" % (this_file_dir(),) script += "import GUROBI_RUN\n" script += "soln = GUROBI_RUN.gurobi_run(" mipgap = float(self.options.mipgap) if self.options.mipgap is not None else None for x in ( problem_filename, warmstart_filename, None, options_dict, self._suffixes, ): script += "%r," % x script += ")\n" script += "GUROBI_RUN.write_result(soln, %r)\n" % solution_filename script += "quit()\n" # dump the script and warm-start file names for the # user if we're keeping files around. if self._keepfiles: script_fname = TempfileManager.create_tempfile(suffix='.gurobi.script') script_file = open(script_fname, 'w') script_file.write(script) script_file.close() print("Solver script file: '%s'" % script_fname) if self._warm_start_solve and (self._warm_start_file_name is not None): print("Solver warm-start file: " + self._warm_start_file_name) # # Define command line # cmd = [executable] if self._timer: cmd.insert(0, self._timer) return Bunch(cmd=cmd, script=script, log_file=self._log_file, env=None)
[docs] def process_soln_file(self, results): # the only suffixes that we extract are constraint duals, # constraint slacks, and variable reduced-costs. scan through # the solver suffix list and throw an exception if the user has # specified any others. extract_duals = False extract_slacks = False extract_rc = False for suffix in self._suffixes: flag = False if re.match(suffix, "dual"): extract_duals = True flag = True if re.match(suffix, "slack"): extract_slacks = True flag = True if re.match(suffix, "rc"): extract_rc = True flag = True if not flag: raise RuntimeError( "***The GUROBI solver plugin cannot extract solution suffix=" + suffix ) # check for existence of the solution file # not sure why we just return - would think that we # would want to indicate some sort of error if not os.path.exists(self._soln_file): return soln = Solution() # caching for efficiency soln_variables = soln.variable soln_constraints = soln.constraint num_variables_read = 0 # string compares are too expensive, so simply introduce some # section IDs. # 0 - unknown # 1 - problem # 2 - solution # 3 - solver section = 0 # unknown solution_seen = False range_duals = {} range_slacks = {} INPUT = open(self._soln_file, "r") for line in INPUT: line = line.strip() tokens = [token.strip() for token in line.split(":")] if tokens[0] == 'section': if tokens[1] == 'problem': section = 1 elif tokens[1] == 'solution': section = 2 solution_seen = True elif tokens[1] == 'solver': section = 3 else: if section == 2: if tokens[0] == 'var': if tokens[1] != "ONE_VAR_CONSTANT": soln_variables[tokens[1]] = {"Value": float(tokens[2])} num_variables_read += 1 elif tokens[0] == 'status': soln.status = getattr(SolutionStatus, tokens[1]) elif tokens[0] == 'gap': soln.gap = float(tokens[1]) elif tokens[0] == 'objective': if tokens[1].strip() != 'None': soln.objective['__default_objective__'] = { 'Value': float(tokens[1]) } if results.problem.sense == minimize: results.problem.upper_bound = float(tokens[1]) else: results.problem.lower_bound = float(tokens[1]) elif tokens[0] == 'constraintdual': name = tokens[1] if name != "c_e_ONE_VAR_CONSTANT": if name.startswith('c_'): soln_constraints.setdefault(tokens[1], {})["Dual"] = ( float(tokens[2]) ) elif name.startswith('r_l_'): range_duals.setdefault(name[4:], [0, 0])[0] = float( tokens[2] ) elif name.startswith('r_u_'): range_duals.setdefault(name[4:], [0, 0])[1] = float( tokens[2] ) elif tokens[0] == 'constraintslack': name = tokens[1] if name != "c_e_ONE_VAR_CONSTANT": if name.startswith('c_'): soln_constraints.setdefault(tokens[1], {})["Slack"] = ( float(tokens[2]) ) elif name.startswith('r_l_'): range_slacks.setdefault(name[4:], [0, 0])[0] = float( tokens[2] ) elif name.startswith('r_u_'): range_slacks.setdefault(name[4:], [0, 0])[1] = float( tokens[2] ) elif tokens[0] == 'varrc': if tokens[1] != "ONE_VAR_CONSTANT": soln_variables[tokens[1]]["Rc"] = float(tokens[2]) else: setattr(soln, tokens[0], tokens[1]) elif section == 1: if tokens[0] == 'sense': if tokens[1] == 'minimize': results.problem.sense = minimize elif tokens[1] == 'maximize': results.problem.sense = maximize else: try: val = eval(tokens[1]) except: val = tokens[1] setattr(results.problem, tokens[0], val) elif section == 3: if tokens[0] == 'status': results.solver.status = getattr(SolverStatus, tokens[1]) elif tokens[0] == 'termination_condition': try: results.solver.termination_condition = getattr( TerminationCondition, tokens[1] ) except AttributeError: results.solver.termination_condition = ( TerminationCondition.unknown ) else: setattr(results.solver, tokens[0], tokens[1]) INPUT.close() # For the range constraints, supply only the dual with the largest # magnitude (at least one should always be numerically zero) for key, (ld, ud) in range_duals.items(): if abs(ld) > abs(ud): soln_constraints['r_l_' + key] = {"Dual": ld} else: # Use the same key soln_constraints['r_l_' + key] = {"Dual": ud} # slacks for key, (ls, us) in range_slacks.items(): if abs(ls) > abs(us): soln_constraints.setdefault('r_l_' + key, {})["Slack"] = ls else: # Use the same key soln_constraints.setdefault('r_l_' + key, {})["Slack"] = us if solution_seen: results.solution.insert(soln)
def _postsolve(self): # take care of the annoying GUROBI log file in the current # directory. this approach doesn't seem overly efficient, but # python os module functions doesn't accept regular expression # directly. filename_list = os.listdir(".") for filename in filename_list: # IMPT: trap the possible exception raised by the file not # existing. this can occur in pyro environments where # > 1 workers are running GUROBI, and were started # from the same directory. these logs don't matter # anyway (we redirect everything), and are largely an # annoyance. try: if re.match(r'gurobi\.log', filename) != None: os.remove(filename) except OSError: pass # let the base class deal with returning results. results = ILMLicensedSystemCallSolver._postsolve(self) # finally, clean any temporary files registered with the temp file # manager, created populated *directly* by this plugin. does not # include, for example, the execution script. but does include # the warm-start file. TempfileManager.pop(remove=not self._keepfiles) return results
[docs] @SolverFactory.register( '_gurobi_file', doc='LP/MPS file-based direct interface to the GUROBI LP/MIP solver' ) class GUROBIFILE(GUROBISHELL): """Direct LP/MPS file-based interface to the GUROBI LP/MIP solver"""
[docs] def available(self, exception_flag=False): if not gurobipy_available: # this triggers the deferred import if exception_flag: raise ApplicationError("gurobipy module not importable") return False if getattr(self, '_available', None) is None: self._check_license() ans = self._available[0] if exception_flag and not ans: raise ApplicationError(msg % self.name) return ans
[docs] def license_is_valid(self): return self.available(False) and self._available[1]
def _check_license(self): licensed = False try: # Gurobipy writes out license file information when creating # the environment with capture_output(capture_fd=True): m = gurobipy.Model() licensed = True except gurobipy.GurobiError: licensed = False self._available = (True, licensed) def _get_version(self): return ( gurobipy.GRB.VERSION_MAJOR, gurobipy.GRB.VERSION_MINOR, gurobipy.GRB.VERSION_TECHNICAL, ) def _default_executable(self): # Bogus, but not None (because the test infrastructure disables # solvers where the executable() is None) return ""
[docs] def create_command_line(self, executable, problem_files): # # Define log file # if self._log_file is None: self._log_file = TempfileManager.create_tempfile(suffix='.gurobi.log') # # Define command line # return Bunch(cmd=[], script="", log_file=self._log_file, env=None)
def _apply_solver(self): # # Execute the command # if is_debug_set(logger): logger.debug("Running %s", self._command.cmd) problem_filename = self._problem_files[0] warmstart_filename = self._warm_start_file_name # translate the options into a normal python dictionary, from a # pyutilib SectionWrapper - because the gurobi_run function was # originally designed to run in the Python environment # distributed in the Gurobi installation (which doesn't know # about pyomo) the translation is necessary. options_dict = {} for key in self.options: options_dict[key] = self.options[key] # display the log/solver file names prior to execution. this is useful # in case something crashes unexpectedly, which is not without precedent. if self._keepfiles: if self._log_file is not None: print("Solver log file: '%s'" % self._log_file) if self._problem_files != []: print("Solver problem files: %s" % str(self._problem_files)) sys.stdout.flush() ostreams = [io.StringIO()] if self._tee: ostreams.append(sys.stdout) with TeeStream(*ostreams) as t: with capture_output(output=t.STDOUT, capture_fd=False): self._soln = GUROBI_RUN.gurobi_run( problem_filename, warmstart_filename, None, options_dict, self._suffixes, ) self._log = ostreams[0].getvalue() self._rc = 0 sys.stdout.flush() return Bunch(rc=self._rc, log=self._log)
[docs] def process_soln_file(self, results): # the only suffixes that we extract are constraint duals, # constraint slacks, and variable reduced-costs. Scan through # the solver suffix list and throw an exception if the user has # specified any others. extract_duals = False extract_slacks = False extract_rc = False for suffix in self._suffixes: flag = False if re.match(suffix, "dual"): extract_duals = True flag = True if re.match(suffix, "slack"): extract_slacks = True flag = True if re.match(suffix, "rc"): extract_rc = True flag = True if not flag: raise RuntimeError( "***The GUROBI solver plugin cannot extract solution suffix=" + suffix ) soln = Solution() # caching for efficiency soln_variables = soln.variable soln_constraints = soln.constraint num_variables_read = 0 # string compares are too expensive, so simply introduce some # section IDs. # 0 - unknown # 1 - problem # 2 - solution # 3 - solver section = 0 # unknown solution_seen = False range_duals = {} range_slacks = {} # Copy over the problem info for key, val in self._soln['problem'].items(): setattr(results.problem, key, val) if results.problem.sense == 'minimize': results.problem.sense = minimize elif results.problem.sense == 'maximize': results.problem.sense = maximize # Copy over the solver info for key, val in self._soln['solver'].items(): setattr(results.solver, key, val) results.solver.status = getattr(SolverStatus, results.solver.status) try: results.solver.termination_condition = getattr( TerminationCondition, results.solver.termination_condition ) except AttributeError: results.solver.termination_condition = TerminationCondition.unknown # Copy over the solution information sol = self._soln.get('solution', None) if sol: if 'status' in sol: soln.status = sol['status'] if 'gap' in sol: soln.gap = sol['gap'] obj = sol.get('objective', None) if obj is not None: soln.objective['__default_objective__'] = {'Value': obj} if results.problem.sense == minimize: results.problem.upper_bound = obj else: results.problem.lower_bound = obj for name, val in sol.get('var', {}).items(): if name == "ONE_VAR_CONSTANT": continue soln_variables[name] = {"Value": val} num_variables_read += 1 for name, val in sol.get('varrc', {}).items(): if name == "ONE_VAR_CONSTANT": continue soln_variables[name]["Rc"] = val for name, val in sol.get('constraintdual', {}).items(): if name == "c_e_ONE_VAR_CONSTANT": continue if name.startswith('c_'): soln_constraints.setdefault(name, {})["Dual"] = val elif name.startswith('r_l_'): range_duals.setdefault(name[4:], [0, 0])[0] = val elif name.startswith('r_u_'): range_duals.setdefault(name[4:], [0, 0])[1] = val for name, val in sol.get('constraintslack', {}).items(): if name == "c_e_ONE_VAR_CONSTANT": continue if name.startswith('c_'): soln_constraints.setdefault(name, {})["Slack"] = val elif name.startswith('r_l_'): range_slacks.setdefault(name[4:], [0, 0])[0] = val elif name.startswith('r_u_'): range_slacks.setdefault(name[4:], [0, 0])[1] = val results.solution.insert(soln) # For the range constraints, supply only the dual with the largest # magnitude (at least one should always be numerically zero) for key, (ld, ud) in range_duals.items(): if abs(ld) > abs(ud): soln_constraints['r_l_' + key] = {"Dual": ld} else: # Use the same key soln_constraints['r_l_' + key] = {"Dual": ud} # slacks for key, (ls, us) in range_slacks.items(): if abs(ls) > abs(us): soln_constraints.setdefault('r_l_' + key, {})["Slack"] = ls else: # Use the same key soln_constraints.setdefault('r_l_' + key, {})["Slack"] = us