Source code for pyomo.opt.solver.shellcmd

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import os
import sys
import time
import logging
import subprocess
from io import StringIO
from contextlib import nullcontext

from pyomo.common.errors import ApplicationError
from pyomo.common.collections import Bunch
from pyomo.common.log import is_debug_set, LoggingIntercept
from pyomo.common.tempfiles import TempfileManager
from pyomo.common.tee import TeeStream

import pyomo.common
from pyomo.opt.base import ResultsFormat
from pyomo.opt.base.solvers import OptSolver
from pyomo.opt.results import SolverStatus, SolverResults

logger = logging.getLogger('pyomo.opt')

# The minimum absolute time (in seconds) to add to the solver timeout
# when setting the timeout for the solver subprocess.  This provides
# time for a solver that timed out to clean up / report a solution
# before we forcibly kill the subprocess.
SUBPROCESS_TIMEOUT_ABS_ADJUST = 1
# The additional time (relative to the user-specified timeout) to add to
# the solver timeout when setting the timeout for the solver subprocess.
# This provides time for a solver that timed out to clean up / report a
# solution before we forcibly kill the subprocess.
SUBPROCESS_TIMEOUT_REL_ADJUST = 0.01


[docs] class SystemCallSolver(OptSolver): """A generic command line solver"""
[docs] def __init__(self, **kwargs): """Constructor""" executable = kwargs.pop('executable', None) validate = kwargs.pop('validate', True) OptSolver.__init__(self, **kwargs) self._keepfiles = False self._results_file = None self._timer = '' self._user_executable = None # broadly useful for reporting, and in cases where # a solver plugin may not report execution time. self._last_solve_time = None self._define_signal_handlers = None self._version_timeout = 2 if executable is not None: self.set_executable(name=executable, validate=validate)
[docs] def set_executable(self, name=None, validate=True): """ Set the executable for this solver. The 'name' keyword can be assigned a relative, absolute, or base filename. If it is unset (None), the executable will be reset to the default value associated with the solver interface. When 'validate' is True (default) extra checks take place that ensure an executable file with that name exists, and then 'name' is converted to an absolute path. On Windows platforms, a '.exe' extension will be appended if necessary when validating 'name'. If a file named 'name' does not appear to be a relative or absolute path, the search will be performed within the directories assigned to the PATH environment variable. """ # Clear any previously cached version number self._version = None if name is None: self._user_executable = None if validate: if self._default_executable() is None: raise ValueError( "Failed to set executable for solver %s to " "its default value. No available solver " "executable was found." % (self.name) ) return if not validate: self._user_executable = name else: exe = pyomo.common.Executable(name) # This is a bit awkward, but we want Executable to re-check # the PATH, so we will explicitly call rehash(). In the # future, we should move to have the solver directly use the # Executable() singleton to manage getting / setting / # overriding paths to various executables. Setting the # executable through the Executable() singleton will # automatically re-check the PATH. exe.rehash() exe = exe.path() if exe is None: raise ValueError( "Failed to set executable for solver %s. File " "with name=%s either does not exist or it is " "not executable. To skip this validation, " "call set_executable with validate=False." % (self.name, name) ) self._user_executable = exe
[docs] def available(self, exception_flag=False): """True if the solver is available""" if self._assert_available: return True if not OptSolver.available(self, exception_flag): return False try: # HACK: Suppress logged warnings about the executable not # being found cm = nullcontext() if exception_flag else LoggingIntercept() with cm: ans = self.executable() except NotImplementedError: ans = None if ans is None: if exception_flag: msg = "No executable found for solver '%s'" raise ApplicationError(msg % self.name) return False return True
[docs] def create_command_line(self, executable, problem_files): """ Create the command line that is executed. """ raise NotImplementedError # pragma:nocover
[docs] def process_logfile(self): """ Process the logfile for information about the optimization process. """ return SolverResults()
[docs] def process_soln_file(self, results): """ Process auxiliary data files generated by the optimizer (e.g. solution files) """ return results
# # NOTE: As JDS has suggested, there could be some value # to allowing the user to change the search path # used to find the default solver executable name # provided by the derived class implementation # (unlike set_executable() which requires the # executable name). This would allow users to # avoid having to know that, for examples, # gurobi.sh is the executable name for the LP # interface and gurobi_ampl is the executable for # the NL interface, while still being able to # switch to a non-default location. # # It seems possible that this functionality could # be implemented here on the base class by simply # adding an optional search_path keyword to the # _default_executable method implemented by # derived classes. How to propagate that through # the pyomo.common.Executable # framework once it gets there is another question # (that I won't be dealing with today). # # # E.g., # def set_search_path(self, path): # self._search_path = path # # where executable would call # self._default_executable(self._search_path) # # UPDATE [30 Jan 19]: The Pyomo executable search system has moved # away from the use of pyutilib's 'registered_executable' # mechanisms. The new system allows clients to augment the # search path through a "pathlist" attribute. #
[docs] def executable(self): """ Returns the executable used by this solver. """ return ( self._user_executable if (self._user_executable is not None) else self._default_executable() )
def _default_executable(self): """ Returns the default executable used by this solver. """ raise NotImplementedError def _presolve(self, *args, **kwds): """ Perform presolves. """ TempfileManager.push() self._keepfiles = kwds.pop("keepfiles", False) self._define_signal_handlers = kwds.pop('use_signal_handling', None) OptSolver._presolve(self, *args, **kwds) # # Verify that the input problems exists # for filename in self._problem_files: if not os.path.exists(filename): msg = 'Solver failed to locate input problem file: %s' raise ValueError(msg % filename) # # Create command line # self._command = self.create_command_line(self.executable(), self._problem_files) self._log_file = self._command.log_file # # The pre-cleanup is probably unnecessary, but also not harmful. # if (self._log_file is not None) and os.path.exists(self._log_file): os.remove(self._log_file) if (self._soln_file is not None) and os.path.exists(self._soln_file): os.remove(self._soln_file) def _apply_solver(self): if pyomo.common.Executable('timer'): self._timer = pyomo.common.Executable('timer').path() # # Execute the command # if is_debug_set(logger): logger.debug("Running %s", self._command.cmd) # display the log/solver file names prior to execution. this is useful # in case something crashes unexpectedly, which is not without precedent. if self._keepfiles: if self._log_file is not None: print("Solver log file: '%s'" % self._log_file) if self._soln_file is not None: print("Solver solution file: '%s'" % self._soln_file) if self._problem_files != []: print("Solver problem files: %s" % str(self._problem_files)) sys.stdout.flush() self._rc, self._log = self._execute_command(self._command) sys.stdout.flush() return Bunch(rc=self._rc, log=self._log) def _postsolve(self): if self._log_file is not None: OUTPUT = open(self._log_file, "w") OUTPUT.write("Solver command line: " + str(self._command.cmd) + '\n') OUTPUT.write("\n") OUTPUT.write(self._log + '\n') OUTPUT.close() # JPW: The cleanup of the problem file probably shouldn't be here, but # rather in the base OptSolver class. That would require movement of # the keepfiles attribute and associated cleanup logic to the base # class, which I didn't feel like doing at this present time. the # base class remove_files method should clean up the problem file. if (self._log_file is not None) and (not os.path.exists(self._log_file)): msg = "File '%s' not generated while executing %s" raise IOError(msg % (self._log_file, self.path)) results = None if self._results_format is not None: results = self.process_output(self._rc) # # If keepfiles is true, then we pop the # TempfileManager context while telling it to # _not_ remove the files. # if not self._keepfiles: # in some cases, the solution filename is # not generated via the temp-file mechanism, # instead being automatically derived from # the input lp/nl filename. so, we may have # to clean it up manually. if (not self._soln_file is None) and os.path.exists(self._soln_file): os.remove(self._soln_file) TempfileManager.pop(remove=not self._keepfiles) return results def _execute_command(self, command): """ Execute the command """ start_time = time.time() if 'script' in command: _input = command.script else: _input = None timeout = self._timelimit if timeout is not None: timeout += max( SUBPROCESS_TIMEOUT_ABS_ADJUST, SUBPROCESS_TIMEOUT_REL_ADJUST * self._timelimit, ) ostreams = [StringIO()] if self._tee: ostreams.append(sys.stdout) try: with TeeStream(*ostreams) as t: results = subprocess.run( command.cmd, input=_input, env=command.env, stdout=t.STDOUT, stderr=t.STDERR, timeout=timeout, universal_newlines=True, cwd=command.cwd if "cwd" in command else None, ) t.STDOUT.flush() t.STDERR.flush() rc = results.returncode log = ostreams[0].getvalue() except OSError: err = sys.exc_info()[1] msg = 'Could not execute the command: %s\tError message: %s' raise ApplicationError(msg % (command.cmd, err)) sys.stdout.flush() self._last_solve_time = time.time() - start_time return [rc, log]
[docs] def process_output(self, rc): """ Process the output files. """ start_time = time.time() if self._results_format is None: raise ValueError("Results format is None") results = self.process_logfile() log_file_completion_time = time.time() if self._report_timing is True: print( " %6.2f seconds required to read logfile " % (log_file_completion_time - start_time) ) if self._results_reader is None: self.process_soln_file(results) soln_file_completion_time = time.time() if self._report_timing is True: print( " %6.2f seconds required to read solution file " % (soln_file_completion_time - log_file_completion_time) ) else: # There is some ambiguity here as to where the solution data # It's natural to expect that the log file contains solution # information, but perhaps also in a results file. # For now, if there is a single solution, then we assume that # the results file is going to add more data to it. if len(results.solution) == 1: results = self._results_reader( self._results_file, res=results, soln=results.solution(0), suffixes=self._suffixes, ) else: results = self._results_reader( self._results_file, res=results, suffixes=self._suffixes ) results_reader_completion_time = time.time() if self._report_timing is True: print( " %6.2f seconds required to read solution file" % (results_reader_completion_time - log_file_completion_time) ) if rc != None: results.solver.error_rc = rc if rc != 0: results.solver.status = SolverStatus.error if self._last_solve_time != None: results.solver.time = self._last_solve_time return results
def _default_results_format(self, prob_format): """Returns the default results format for different problem formats. """ return ResultsFormat.soln