Source code for pyomo.opt.base.solvers

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import re
import sys
import time
import logging
import shlex

from pyomo.common import Factory
from pyomo.common.errors import ApplicationError
from pyomo.common.collections import Bunch

from pyomo.opt.base.convert import convert_problem
from pyomo.opt.base.formats import ResultsFormat
import pyomo.opt.base.results

logger = logging.getLogger('pyomo.opt')


# The version string is first searched for trunk/Trunk, and if
# found a tuple of infinities is returned. Otherwise, the first
# match of number[.number] where [.number] can repeat 1-3 times
# is used, which is translated into a tuple of size matching
# the keyword length (appending 0's when necessary). If no match
# is found None is returned (although one could argue a tuple of
# 0's might be appropriated).
def _extract_version(x, length=4):
    """
    Attempts to extract solver version information from a string.
    """
    assert (1 <= length) and (length <= 4)
    m = re.search('[t,T]runk', x)
    if m is not None:
        # Since most version checks are comparing if the current
        # version is greater/less than some other version, it makes
        # since that a solver advertising trunk should always be greater
        # than a version check, hence returning a tuple of infinities
        return tuple(float('inf') for i in range(length))
    m = re.search(r'[0-9]+(\.[0-9]+){1,3}', x)
    if not m is None:
        version = tuple(int(i) for i in m.group(0).split('.')[:length])
        while len(version) < length:
            version += (0,)
        return version
    return None  # (0,0,0,0)[:length]


[docs] class UnknownSolver(object):
[docs] def __init__(self, *args, **kwds): # super(UnknownSolver,self).__init__(**kwds) # # The 'type' is the class type of the solver instance # if "type" in kwds: self.type = kwds["type"] else: # pragma:nocover raise ValueError("Expected option 'type' for UnknownSolver constructor") self.options = {} self._args = args self._kwds = kwds
# # Support "with" statements. Forgetting to call deactivate # on Plugins is a common source of memory leaks # def __enter__(self): return self def __exit__(self, t, v, traceback): pass
[docs] def available(self, exception_flag=True): """Determine if this optimizer is available.""" if exception_flag: raise ApplicationError("Solver (%s) not available" % str(self.name)) return False
[docs] def license_is_valid(self): "True if the solver is present and has a valid license (if applicable)" return False
[docs] def warm_start_capable(self): """True is the solver can accept a warm-start solution.""" return False
[docs] def solve(self, *args, **kwds): """Perform optimization and return an SolverResults object.""" self._solver_error('solve')
[docs] def reset(self): """Reset the state of an optimizer""" self._solver_error('reset')
[docs] def set_options(self, istr): """Set the options in the optimizer from a string.""" self._solver_error('set_options')
def __bool__(self): return self.available() def __getattr__(self, attr): self._solver_error(attr) def _solver_error(self, method_name): raise RuntimeError( """Attempting to use an unavailable solver. The SolverFactory was unable to create the solver "%s" and returned an UnknownSolver object. This error is raised at the point where the UnknownSolver object was used as if it were valid (by calling method "%s"). The original solver was created with the following parameters: \t""" % (self.type, method_name) + "\n\t".join("%s: %s" % i for i in sorted(self._kwds.items())) + "\n\t_args: %s" % (self._args,) + "\n\toptions: %s" % (self.options,) )
[docs] class SolverFactoryClass(Factory): def __call__(self, _name=None, **kwds): if _name is None: return self _name = str(_name) if ':' in _name: _name, subsolver = _name.split(':', 1) kwds['solver'] = subsolver elif 'solver' in kwds: subsolver = kwds['solver'] else: subsolver = None opt = None try: if _name in self._cls: opt = self._cls[_name](**kwds) else: mode = kwds.get('solver_io', 'nl') if mode is None: mode = 'nl' _implicit_solvers = {'nl': 'asl'} if "executable" not in kwds: kwds["executable"] = _name if mode in _implicit_solvers: if _implicit_solvers[mode] not in self._cls: raise RuntimeError( " The solver plugin was not registered.\n" " Please confirm that the 'pyomo.environ' package has been imported." ) opt = self._cls[_implicit_solvers[mode]](**kwds) if opt is not None: opt.set_options('solver=' + _name) except: err = sys.exc_info() logger.warning( "Failed to create solver with name '%s':\n%s" % (_name, err[1]), exc_info=err, ) opt = None if opt is not None and _name != "py" and subsolver is not None: # py just creates instance of its subsolver, no need for this option opt.set_options('solver=' + subsolver) if opt is None: opt = UnknownSolver(type=_name, **kwds) opt.name = _name return opt
LegacySolverFactory = SolverFactoryClass('solver type') SolverFactory = SolverFactoryClass('solver type') SolverFactory._cls = LegacySolverFactory._cls SolverFactory._doc = LegacySolverFactory._doc # # TODO: It is impossible to load CBC with NL file-io using this function, # i.e., SolverFactory("cbc", solver_io='nl'), # this is NOT asl:cbc (same with PICO) # WEH: Why is there a distinction between SolverFactory('asl:cbc') and SolverFactory('cbc', solver_io='nl')??? This is bad. #
[docs] def check_available_solvers(*args): from pyomo.solvers.plugins.solvers.GUROBI import GUROBISHELL from pyomo.solvers.plugins.solvers.BARON import BARONSHELL from pyomo.solvers.plugins.solvers.mosek_direct import MOSEKDirect logging.disable(logging.WARNING) ans = [] for arg in args: if not isinstance(arg, tuple): name = arg arg = (arg,) else: name = arg[0] opt = SolverFactory(*arg) if opt is None or isinstance(opt, UnknownSolver): continue # not available if not opt.available(exception_flag=False): continue # not available if hasattr(opt, 'executable') and opt.executable() is None: continue # not available if not opt.license_is_valid(): continue # not available # At this point, the solver is available (and licensed) ans.append(name) logging.disable(logging.NOTSET) return ans
def _raise_ephemeral_error(name, keyword=""): raise AttributeError( "The property '%s' can no longer be set directly on " "the solver object. It should instead be passed as a " "keyword into the solve method%s. It will automatically " "be reset to its default value after each invocation of " "solve." % (name, keyword) )
[docs] class OptSolver(object): """A generic optimization solver""" # # Support "with" statements. Forgetting to call deactivate # on Plugins is a common source of memory leaks # def __enter__(self): return self def __exit__(self, t, v, traceback): pass # # Adding to help track down invalid code after making # the following attributes private # @property def tee(self): _raise_ephemeral_error('tee') @tee.setter def tee(self, val): _raise_ephemeral_error('tee') @property def suffixes(self): _raise_ephemeral_error('suffixes') @suffixes.setter def suffixes(self, val): _raise_ephemeral_error('suffixes') @property def keepfiles(self): _raise_ephemeral_error('keepfiles') @keepfiles.setter def keepfiles(self, val): _raise_ephemeral_error('keepfiles') @property def soln_file(self): _raise_ephemeral_error('soln_file') @soln_file.setter def soln_file(self, val): _raise_ephemeral_error('soln_file') @property def log_file(self): _raise_ephemeral_error('log_file') @log_file.setter def log_file(self, val): _raise_ephemeral_error('log_file') @property def symbolic_solver_labels(self): _raise_ephemeral_error('symbolic_solver_labels') @symbolic_solver_labels.setter def symbolic_solver_labels(self, val): _raise_ephemeral_error('symbolic_solver_labels') @property def warm_start_solve(self): _raise_ephemeral_error('warm_start_solve', keyword=" (warmstart)") @warm_start_solve.setter def warm_start_solve(self, val): _raise_ephemeral_error('warm_start_solve', keyword=" (warmstart)") @property def warm_start_file_name(self): _raise_ephemeral_error('warm_start_file_name', keyword=" (warmstart_file)") @warm_start_file_name.setter def warm_start_file_name(self, val): _raise_ephemeral_error('warm_start_file_name', keyword=" (warmstart_file)")
[docs] def __init__(self, **kwds): """Constructor""" # # The 'type' is the class type of the solver instance # if "type" in kwds: self.type = kwds["type"] else: # pragma:nocover raise ValueError("Expected option 'type' for OptSolver constructor") # # The 'name' is either the class type of the solver instance, or a # assigned name. # if "name" in kwds: self.name = kwds["name"] else: self.name = self.type if "doc" in kwds: self._doc = kwds["doc"] else: if self.type is None: # pragma:nocover self._doc = "" elif self.name == self.type: self._doc = "%s OptSolver" % self.name else: self._doc = "%s OptSolver (type %s)" % (self.name, self.type) # # Options are persistent, meaning users must modify the # options dict directly rather than pass them into _presolve # through the solve command. Everything else is reset inside # presolve # self.options = Bunch() if 'options' in kwds and not kwds['options'] is None: for key in kwds['options']: setattr(self.options, key, kwds['options'][key]) # the symbol map is an attribute of the solver plugin only # because it is generated in presolve and used to tag results # so they are interpretable - basically, it persists across # multiple methods. self._smap_id = None # These are ephemeral options that can be set by the user during # the call to solve, but will be reset to defaults if not given self._load_solutions = True self._select_index = 0 self._report_timing = False self._suffixes = [] self._log_file = None self._soln_file = None # overridden by a solver plugin when it returns sparse results self._default_variable_value = None # overridden by a solver plugin when it is always available self._assert_available = False # overridden by a solver plugin to indicate its input file format self._problem_format = None self._valid_problem_formats = [] # overridden by a solver plugin to indicate its results file format self._results_format = None self._valid_result_formats = {} self._results_reader = None self._problem = None self._problem_files = None # # Used to document meta solvers # self._metasolver = False self._version = None # # Data for solver callbacks # self._allow_callbacks = False self._callback = {} # We define no capabilities for the generic solver; base # classes must override this self._capabilities = Bunch()
@staticmethod def _options_string_to_dict(istr): ans = {} istr = istr.strip() if not istr: return ans if istr[0] == "'" or istr[0] == '"': istr = eval(istr) tokens = shlex.split(istr) for token in tokens: index = token.find('=') if index == -1: raise ValueError( "Solver options must have the form option=value: '%s'" % istr ) try: val = eval(token[(index + 1) :]) except: val = token[(index + 1) :] ans[token[:index]] = val return ans def default_variable_value(self): return self._default_variable_value def __bool__(self): return self.available()
[docs] def version(self): """ Returns a 4-tuple describing the solver executable version. """ if self._version is None: self._version = self._get_version() return self._version
def _get_version(self): return None
[docs] def problem_format(self): """ Returns the current problem format. """ return self._problem_format
[docs] def set_problem_format(self, format): """ Set the current problem format (if it's valid) and update the results format to something valid for this problem format. """ if format in self._valid_problem_formats: self._problem_format = format else: raise ValueError( "%s is not a valid problem format for solver plugin %s" % (format, self) ) self._results_format = self._default_results_format(self._problem_format)
[docs] def results_format(self): """ Returns the current results format. """ return self._results_format
[docs] def set_results_format(self, format): """ Set the current results format (if it's valid for the current problem format). """ if (self._problem_format in self._valid_result_formats) and ( format in self._valid_result_formats[self._problem_format] ): self._results_format = format else: raise ValueError( "%s is not a valid results format for " "problem format %s with solver plugin %s" % (format, self._problem_format, self) )
[docs] def has_capability(self, cap): """ Returns a boolean value representing whether a solver supports a specific feature. Defaults to 'False' if the solver is unaware of an option. Expects a string. Example: # prints True if solver supports sos1 constraints, and False otherwise print(solver.has_capability('sos1') # prints True is solver supports 'feature', and False otherwise print(solver.has_capability('feature') Parameters ---------- cap: str The feature Returns ------- val: bool Whether or not the solver has the specified capability. """ if not isinstance(cap, str): raise TypeError( "Expected argument to be of type '%s', not " "'%s'." % (type(str()), type(cap)) ) else: val = self._capabilities[str(cap)] if val is None: return False else: return val
[docs] def available(self, exception_flag=True): """True if the solver is available""" return True
[docs] def license_is_valid(self): "True if the solver is present and has a valid license (if applicable)" return True
[docs] def warm_start_capable(self): """True is the solver can accept a warm-start solution""" return False
[docs] def solve(self, *args, **kwds): """Solve the problem""" self.available(exception_flag=True) # # If the inputs are models, then validate that they have been # constructed! Collect suffix names to try and import from solution. # from pyomo.core.base.block import BlockData import pyomo.core.base.suffix from pyomo.core.kernel.block import IBlock import pyomo.core.kernel.suffix _model = None for arg in args: if isinstance(arg, (BlockData, IBlock)): if isinstance(arg, BlockData): if not arg.is_constructed(): raise RuntimeError( "Attempting to solve model=%s with unconstructed " "component(s)" % (arg.name,) ) _model = arg # import suffixes must be on the top-level model if isinstance(arg, BlockData): model_suffixes = list( name for ( name, comp, ) in pyomo.core.base.suffix.active_import_suffix_generator(arg) ) else: assert isinstance(arg, IBlock) model_suffixes = list( comp.storage_key for comp in pyomo.core.kernel.suffix.import_suffix_generator( arg, active=True, descend_into=False ) ) if len(model_suffixes) > 0: kwds_suffixes = kwds.setdefault('suffixes', []) for name in model_suffixes: if name not in kwds_suffixes: kwds_suffixes.append(name) # # Handle ephemeral solvers options here. These # will override whatever is currently in the options # dictionary, but we will reset these options to # their original value at the end of this method. # orig_options = self.options self.options = Bunch() self.options.update(orig_options) self.options.update(kwds.pop('options', {})) self.options.update( self._options_string_to_dict(kwds.pop('options_string', '')) ) try: # we're good to go. initial_time = time.time() self._presolve(*args, **kwds) presolve_completion_time = time.time() if self._report_timing: print( " %6.2f seconds required for presolve" % (presolve_completion_time - initial_time) ) if not _model is None: self._initialize_callbacks(_model) _status = self._apply_solver() if hasattr(self, '_transformation_data'): del self._transformation_data if not hasattr(_status, 'rc'): logger.warning( "Solver (%s) did not return a solver status code.\n" "This is indicative of an internal solver plugin error.\n" "Please report this to the Pyomo developers." ) elif _status.rc: logger.error( "Solver (%s) returned non-zero return code (%s)" % (self.name, _status.rc) ) if self._tee: logger.error("See the solver log above for diagnostic information.") elif hasattr(_status, 'log') and _status.log: logger.error("Solver log:\n" + str(_status.log)) raise ApplicationError("Solver (%s) did not exit normally" % self.name) solve_completion_time = time.time() if self._report_timing: print( " %6.2f seconds required for solver" % (solve_completion_time - presolve_completion_time) ) result = self._postsolve() result._smap_id = self._smap_id result._smap = None if _model: if isinstance(_model, IBlock): if len(result.solution) == 1: result.solution(0).symbol_map = getattr( _model, "._symbol_maps" )[result._smap_id] result.solution(0).default_variable_value = ( self._default_variable_value ) if self._load_solutions: _model.load_solution(result.solution(0)) else: assert len(result.solution) == 0 # see the hack in the write method # we don't want this to stick around on the model # after the solve assert len(getattr(_model, "._symbol_maps")) == 1 delattr(_model, "._symbol_maps") del result._smap_id if self._load_solutions and (len(result.solution) == 0): logger.error("No solution is available") else: if self._load_solutions: _model.solutions.load_from( result, select=self._select_index, default_variable_value=self._default_variable_value, ) result._smap_id = None result.solution.clear() else: result._smap = _model.solutions.symbol_map[self._smap_id] _model.solutions.delete_symbol_map(self._smap_id) postsolve_completion_time = time.time() if self._report_timing: print( " %6.2f seconds required for postsolve" % (postsolve_completion_time - solve_completion_time) ) finally: # # Reset the options dict # self.options = orig_options return result
def _presolve(self, *args, **kwds): self._log_file = kwds.pop("logfile", None) self._soln_file = kwds.pop("solnfile", None) self._select_index = kwds.pop("select", 0) self._load_solutions = kwds.pop("load_solutions", True) self._timelimit = kwds.pop("timelimit", None) self._report_timing = kwds.pop("report_timing", False) self._tee = kwds.pop("tee", False) self._assert_available = kwds.pop("available", True) self._suffixes = kwds.pop("suffixes", []) self.available() if self._problem_format: write_start_time = time.time() (self._problem_files, self._problem_format, self._smap_id) = ( self._convert_problem( args, self._problem_format, self._valid_problem_formats, **kwds ) ) total_time = time.time() - write_start_time if self._report_timing: print(" %6.2f seconds required to write file" % total_time) else: if len(kwds): raise ValueError( "Solver=" + self.type + " passed unrecognized keywords: \n\t" + ("\n\t".join("%s = %s" % (k, v) for k, v in kwds.items())) ) if (type(self._problem_files) in (list, tuple)) and ( not isinstance(self._problem_files[0], str) ): self._problem_files = self._problem_files[0]._problem_files() if self._results_format is None: self._results_format = self._default_results_format(self._problem_format) # # Disabling this check for now. A solver doesn't have just # _one_ results format. # # if self._results_format not in \ # self._valid_result_formats[self._problem_format]: # raise ValueError("Results format '"+str(self._results_format)+"' " # "cannot be used with problem format '" # +str(self._problem_format)+"' in solver "+self.name) if self._results_format == ResultsFormat.soln: self._results_reader = None else: self._results_reader = pyomo.opt.base.results.ReaderFactory( self._results_format ) def _initialize_callbacks(self, model): """Initialize call-back functions""" pass def _apply_solver(self): """The routine that performs the solve""" raise NotImplementedError # pragma:nocover def _postsolve(self): """The routine that does solve post-processing""" return self.results def _convert_problem(self, args, problem_format, valid_problem_formats, **kwds): return convert_problem( args, problem_format, valid_problem_formats, self.has_capability, **kwds ) def _default_results_format(self, prob_format): """Returns the default results format for different problem formats. """ return ResultsFormat.results
[docs] def reset(self): """ Reset the state of the solver """ pass
def _get_options_string(self, options=None): if options is None: options = self.options ans = [] for key in options: val = options[key] if isinstance(val, str) and ' ' in val: ans.append("%s=\"%s\"" % (str(key), str(val))) else: ans.append("%s=%s" % (str(key), str(val))) return ' '.join(ans) def set_options(self, istr): if isinstance(istr, str): istr = self._options_string_to_dict(istr) for key in istr: if not istr[key] is None: setattr(self.options, key, istr[key])
[docs] def set_callback(self, name, callback_fn=None): """ Set the callback function for a named callback. A call-back function has the form: def fn(solver, model): pass where 'solver' is the native solver interface object and 'model' is a Pyomo model instance object. """ if not self._allow_callbacks: raise ApplicationError("Callbacks disabled for solver %s" % self.name) if callback_fn is None: if name in self._callback: del self._callback[name] else: self._callback[name] = callback_fn
def config_block(self, init=False): from pyomo.scripting.solve_config import default_config_block return default_config_block(self, init)[0]