# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
import logging
import os
import re
import sys
from pyomo.common.dependencies import attempt_import
from pyomo.opt import SolverFactory, SolverManagerFactory, OptSolver
from pyomo.opt.parallel.manager import ActionManagerError, ActionStatus
from pyomo.opt.parallel.async_solver import AsynchronousSolverManager
from pyomo.core.base import Block
import pyomo.neos.kestrel
xmlrpc_client = attempt_import('xmlrpc.client')[0]
logger = logging.getLogger('pyomo.neos')
def _neos_error(msg, results, current_message):
error_re = re.compile('error', flags=re.I)
warn_re = re.compile('warn', flags=re.I)
logger.error("%s NEOS log:\n%s" % (msg, current_message), exc_info=sys.exc_info())
soln_data = results.data.decode('utf-8')
for line in soln_data.splitlines():
if error_re.search(line):
logger.error(line)
elif warn_re.search(line):
logger.warning(line)
[docs]
@SolverManagerFactory.register(
'neos', doc="Asynchronously execute solvers on the NEOS server"
)
class SolverManager_NEOS(AsynchronousSolverManager):
[docs]
def clear(self):
"""
Clear manager state
"""
AsynchronousSolverManager.clear(self)
self.kestrel = pyomo.neos.kestrel.kestrelAMPL()
self._ah = {} # maps NEOS job numbers to their corresponding
# action handle.
self._args = {}
self._opt_data = {}
# to grab streamed output from NEOS, need to keep
# map of action handle to the to-date string of
# extracted output.
# TBD: The following entries aren't currently cleaned up, but
# we're still trying to get the basics down.
# store pairs of NEOS message offset and NEOS message string.
# index into the map is the NEOS job number
self._neos_log = {}
self._solvers = {}
def _perform_queue(self, ah, *args, **kwds):
"""
Perform the queue operation. This method returns the ActionHandle,
and the ActionHandle status indicates whether the queue was successful.
"""
solver = kwds.pop('solver', kwds.pop('opt', None))
if solver is None:
raise ActionManagerError(
"No solver passed to %s, use keyword option 'solver'"
% (type(self).__name__)
)
if not isinstance(solver, str):
solver_name = solver.name
if solver_name == 'asl':
solver_name = os.path.basename(solver.executable())
else:
solver_name = solver
solver = None
#
# Handle ephemeral solvers options here. These
# will override whatever is currently in the options
# dictionary, but we will reset these options to
# their original value at the end of this method.
#
user_solver_options = {}
# make sure to transfer the options dict on the
# solver plugin if the user does not use a string
# to identify the neos solver. The ephemeral
# options must also go after these.
if solver is not None:
user_solver_options.update(solver.options)
_options = kwds.pop('options', {})
if isinstance(_options, str):
_options = OptSolver._options_string_to_dict(_options)
user_solver_options.update(_options)
user_solver_options.update(
OptSolver._options_string_to_dict(kwds.pop('options_string', ''))
)
# JDS: [5/13/17] The following is a HACK. This timeout flag is
# set by pyomo/scripting/util.py:apply_optimizer. If we do not
# remove it, it will get passed to the NEOS solver. For solvers
# like CPLEX 12.7.0, this will cause a fatal error as it is not
# a known option.
if user_solver_options.get('timelimit', 0) is None:
del user_solver_options['timelimit']
opt = SolverFactory('_neos')
opt._presolve(*args, **kwds)
#
# Map NEOS name, using lowercase convention in Pyomo
#
if len(self._solvers) == 0:
for name in self.kestrel.solvers():
if name.endswith('AMPL'):
self._solvers[name[:-5].lower()] = name[:-5]
if solver_name not in self._solvers:
raise ActionManagerError(
"Solver '%s' is not recognized by NEOS. "
"Solver names recognized:\n%s"
% (solver_name, str(sorted(self._solvers.keys())))
)
#
# Apply kestrel
#
# Set the kestrel_options environment
#
neos_sname = self._solvers[solver_name].lower()
os.environ['kestrel_options'] = 'solver=%s' % self._solvers[solver_name]
#
# Set the <solver>_options environment
#
solver_options = {}
for key in opt.options:
solver_options[key] = opt.options[key]
solver_options.update(user_solver_options)
options = opt._get_options_string(solver_options)
if not options == "":
os.environ[neos_sname + '_options'] = options
#
# Generate an XML string using these two environment variables
#
xml = self.kestrel.formXML(opt._problem_files[0])
(jobNumber, password) = self.kestrel.submit(xml)
ah.job = jobNumber
ah.password = password
#
# Cleanup
#
del os.environ['kestrel_options']
try:
del os.environ[neos_sname + "_options"]
except:
pass
#
# Store action handle, and return
#
self._ah[jobNumber] = ah
self._neos_log[jobNumber] = (0, "")
self._opt_data[jobNumber] = (
opt,
opt._smap_id,
opt._load_solutions,
opt._select_index,
opt._default_variable_value,
)
self._args[jobNumber] = args
return ah
def _perform_wait_any(self):
"""
Perform the wait_any operation. This method returns an
ActionHandle with the results of waiting. If None is returned
then the ActionManager assumes that it can call this method again.
Note that an ActionHandle can be returned with a dummy value,
to indicate an error.
"""
for jobNumber in self._ah:
status = self.kestrel.neos.getJobStatus(
jobNumber, self._ah[jobNumber].password
)
if status not in ("Running", "Waiting"):
# the job is done.
ah = self._ah[jobNumber]
del self._ah[jobNumber]
ah.status = ActionStatus.done
(opt, smap_id, load_solutions, select_index, default_variable_value) = (
self._opt_data[jobNumber]
)
del self._opt_data[jobNumber]
args = self._args[jobNumber]
del self._args[jobNumber]
# retrieve the final results, which are in message/log format.
results = self.kestrel.neos.getFinalResults(jobNumber, ah.password)
(current_offset, current_message) = self._neos_log[jobNumber]
with open(opt._log_file, 'w') as OUTPUT:
OUTPUT.write(current_message)
with open(opt._soln_file, 'w') as OUTPUT:
OUTPUT.write(results.data.decode('utf-8'))
rc = None
try:
solver_results = opt.process_output(rc)
except:
_neos_error(
"Error parsing NEOS solution file", results, current_message
)
return ah
solver_results._smap_id = smap_id
self.results[ah.id] = solver_results
if isinstance(args[0], Block):
_model = args[0]
if load_solutions:
try:
_model.solutions.load_from(
solver_results,
select=select_index,
default_variable_value=default_variable_value,
)
except:
_neos_error(
"Error loading NEOS solution into model",
results,
current_message,
)
solver_results._smap_id = None
solver_results.solution.clear()
else:
solver_results._smap = _model.solutions.symbol_map[smap_id]
_model.solutions.delete_symbol_map(smap_id)
return ah
else:
# The job is still running...
#
# Grab the partial messages from NEOS as you go, in case
# you want to output on-the-fly. You will only get data
# if the job was routed to the "short" priority queue.
(current_offset, current_message) = self._neos_log[jobNumber]
# TBD: blocking isn't the way to go, but non-blocking
# was triggering some exception in kestrel.
#
# [5/13/17]: The blocking fetch will timeout in 2
# minutes. If NEOS doesn't produce intermediate results
# by then we will need to catch (and eat) the exception
try:
(message_fragment, new_offset) = (
self.kestrel.neos.getIntermediateResults(
jobNumber, self._ah[jobNumber].password, current_offset
)
)
logger.info(message_fragment)
self._neos_log[jobNumber] = (
new_offset,
current_message + ((message_fragment.data).decode('utf-8')),
)
except xmlrpc_client.ProtocolError:
# The command probably timed out
pass
return None
def _kill_all_pending_jobs(self):
for ah in self._ah.values():
self.kestrel.kill(ah.job, ah.password)
def __exit__(self, t, v, traceback):
self._kill_all_pending_jobs()