from pyomo.common.tempfiles import TempfileManager
from pyomo.common.fileutils import Executable
from pyomo.contrib.appsi.base import PersistentSolver, Results, TerminationCondition, SolverConfig, PersistentSolutionLoader
from pyomo.contrib.appsi.writers import NLWriter
from pyomo.common.log import LogStream
import logging
import subprocess
from pyomo.core.kernel.objective import minimize
import math
from pyomo.common.collections import ComponentMap
from pyomo.core.expr.numvalue import value
from pyomo.core.expr.visitor import replace_expressions
from typing import Optional, Sequence, NoReturn, List, Mapping
from pyomo.core.base.var import _GeneralVarData
from pyomo.core.base.constraint import _GeneralConstraintData
from pyomo.core.base.block import _BlockData
from pyomo.core.base.param import _ParamData
from pyomo.core.base.objective import _GeneralObjectiveData
from pyomo.common.timing import HierarchicalTimer
from pyomo.common.tee import TeeStream
import sys
from typing import Dict
from pyomo.common.config import ConfigValue, NonNegativeInt
from pyomo.common.errors import PyomoException
import os
from pyomo.contrib.appsi.cmodel import cmodel_available
from pyomo.core.staleflag import StaleFlagManager
logger = logging.getLogger(__name__)
[docs]class IpoptConfig(SolverConfig):
def __init__(self,
description=None,
doc=None,
implicit=False,
implicit_domain=None,
visibility=0):
super(IpoptConfig, self).__init__(description=description,
doc=doc,
implicit=implicit,
implicit_domain=implicit_domain,
visibility=visibility)
self.declare('executable', ConfigValue())
self.declare('filename', ConfigValue(domain=str))
self.declare('keepfiles', ConfigValue(domain=bool))
self.declare('solver_output_logger', ConfigValue())
self.declare('log_level', ConfigValue(domain=NonNegativeInt))
self.executable = Executable('ipopt')
self.filename = None
self.keepfiles = False
self.solver_output_logger = logger
self.log_level = logging.INFO
ipopt_command_line_options = {'acceptable_compl_inf_tol',
'acceptable_constr_viol_tol',
'acceptable_dual_inf_tol',
'acceptable_tol',
'alpha_for_y',
'bound_frac',
'bound_mult_init_val',
'bound_push',
'bound_relax_factor',
'compl_inf_tol',
'constr_mult_init_max',
'constr_viol_tol',
'diverging_iterates_tol',
'dual_inf_tol',
'expect_infeasible_problem',
'file_print_level',
'halt_on_ampl_error',
'hessian_approximation',
'honor_original_bounds',
'linear_scaling_on_demand',
'linear_solver',
'linear_system_scaling',
'ma27_pivtol',
'ma27_pivtolmax',
'ma57_pivot_order',
'ma57_pivtol',
'ma57_pivtolmax',
'max_cpu_time',
'max_iter',
'max_refinement_steps',
'max_soc',
'maxit',
'min_refinement_steps',
'mu_init',
'mu_max',
'mu_oracle',
'mu_strategy',
'nlp_scaling_max_gradient',
'nlp_scaling_method',
'obj_scaling_factor',
'option_file_name',
'outlev',
'output_file',
'pardiso_matching_strategy',
'print_level',
'print_options_documentation',
'print_user_options',
'required_infeasibility_reduction',
'slack_bound_frac',
'slack_bound_push',
'tol',
'wantsol',
'warm_start_bound_push',
'warm_start_init_point',
'warm_start_mult_bound_push',
'watchdog_shortened_iter_trigger'}
[docs]class Ipopt(PersistentSolver):
def __init__(self, only_child_vars=True):
self._config = IpoptConfig()
self._solver_options = dict()
self._writer = NLWriter(only_child_vars=only_child_vars)
self._filename = None
self._dual_sol = dict()
self._primal_sol = ComponentMap()
self._reduced_costs = ComponentMap()
self._last_results_object: Optional[Results] = None
[docs] def available(self):
if self.config.executable.path() is None:
return self.Availability.NotFound
elif not cmodel_available:
return self.Availability.NeedsCompiledExtension
return self.Availability.FullLicense
[docs] def version(self):
results = subprocess.run([str(self.config.executable), '--version'],
timeout=1,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
version = results.stdout.splitlines()[0]
version = version.split(' ')[1]
version = version.strip()
version = tuple(int(i) for i in version.split('.'))
return version
[docs] def nl_filename(self):
if self._filename is None:
return None
else:
return self._filename + '.nl'
[docs] def sol_filename(self):
if self._filename is None:
return None
else:
return self._filename + '.sol'
[docs] def options_filename(self):
if self._filename is None:
return None
else:
return self._filename + '.opt'
@property
def config(self):
return self._config
@config.setter
def config(self, val):
self._config = val
@property
def ipopt_options(self):
"""
A dictionary mapping solver options to values for those options. These are solver specific.
Returns
-------
dict
A dictionary mapping solver options to values for those options
"""
return self._solver_options
@ipopt_options.setter
def ipopt_options(self, val: Dict):
self._solver_options = val
@property
def update_config(self):
return self._writer.update_config
@property
def writer(self):
return self._writer
@property
def symbol_map(self):
return self._writer.symbol_map
[docs] def set_instance(self, model):
self._writer.config.symbolic_solver_labels = self.config.symbolic_solver_labels
self._writer.set_instance(model)
[docs] def add_variables(self, variables: List[_GeneralVarData]):
self._writer.add_variables(variables)
[docs] def add_params(self, params: List[_ParamData]):
self._writer.add_params(params)
[docs] def add_constraints(self, cons: List[_GeneralConstraintData]):
self._writer.add_constraints(cons)
[docs] def add_block(self, block: _BlockData):
self._writer.add_block(block)
[docs] def remove_variables(self, variables: List[_GeneralVarData]):
self._writer.remove_variables(variables)
[docs] def remove_params(self, params: List[_ParamData]):
self._writer.remove_params(params)
[docs] def remove_constraints(self, cons: List[_GeneralConstraintData]):
self._writer.remove_constraints(cons)
[docs] def remove_block(self, block: _BlockData):
self._writer.remove_block(block)
[docs] def set_objective(self, obj: _GeneralObjectiveData):
self._writer.set_objective(obj)
[docs] def update_variables(self, variables: List[_GeneralVarData]):
self._writer.update_variables(variables)
[docs] def update_params(self):
self._writer.update_params()
def _write_options_file(self):
f = open(self._filename + '.opt', 'w')
for k, val in self.ipopt_options.items():
if k not in ipopt_command_line_options:
f.write(str(k) + ' ' + str(val) + '\n')
f.close()
[docs] def solve(self, model, timer: HierarchicalTimer = None):
StaleFlagManager.mark_all_as_stale()
avail = self.available()
if not avail:
raise PyomoException(f'Solver {self.__class__} is not available ({avail}).')
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
if timer is None:
timer = HierarchicalTimer()
try:
TempfileManager.push()
if self.config.filename is None:
nl_filename = TempfileManager.create_tempfile(suffix='.nl')
self._filename = nl_filename.split('.')[0]
else:
self._filename = self.config.filename
TempfileManager.add_tempfile(self._filename + '.nl', exists=False)
TempfileManager.add_tempfile(self._filename + '.sol', exists=False)
TempfileManager.add_tempfile(self._filename + '.opt', exists=False)
self._write_options_file()
timer.start('write nl file')
self._writer.write(model, self._filename+'.nl', timer=timer)
timer.stop('write nl file')
res = self._apply_solver(timer)
self._last_results_object = res
if self.config.report_timing:
logger.info('\n' + str(timer))
return res
finally:
# finally, clean any temporary files registered with the
# temp file manager, created/populated *directly* by this
# plugin.
TempfileManager.pop(remove=not self.config.keepfiles)
if not self.config.keepfiles:
self._filename = None
def _parse_sol(self):
solve_vars = self._writer.get_ordered_vars()
solve_cons = self._writer.get_ordered_cons()
results = Results()
f = open(self._filename + '.sol', 'r')
all_lines = list(f.readlines())
f.close()
termination_line = all_lines[1]
if 'Optimal Solution Found' in termination_line:
results.termination_condition = TerminationCondition.optimal
elif 'Problem may be infeasible' in termination_line:
results.termination_condition = TerminationCondition.infeasible
elif 'problem might be unbounded' in termination_line:
results.termination_condition = TerminationCondition.unbounded
elif 'Maximum Number of Iterations Exceeded' in termination_line:
results.termination_condition = TerminationCondition.maxIterations
elif 'Maximum CPU Time Exceeded' in termination_line:
results.termination_condition = TerminationCondition.maxTimeLimit
else:
results.termination_condition = TerminationCondition.unknown
n_cons = len(solve_cons)
n_vars = len(solve_vars)
dual_lines = all_lines[12:12+n_cons]
primal_lines = all_lines[12+n_cons:12+n_cons+n_vars]
rc_upper_info_line = all_lines[12+n_cons+n_vars+1]
assert rc_upper_info_line.startswith('suffix')
n_rc_upper = int(rc_upper_info_line.split()[2])
assert 'ipopt_zU_out' in all_lines[12+n_cons+n_vars+2]
upper_rc_lines = all_lines[12+n_cons+n_vars+3:12+n_cons+n_vars+3+n_rc_upper]
rc_lower_info_line = all_lines[12+n_cons+n_vars+3+n_rc_upper]
assert rc_lower_info_line.startswith('suffix')
n_rc_lower = int(rc_lower_info_line.split()[2])
assert 'ipopt_zL_out' in all_lines[12+n_cons+n_vars+3+n_rc_upper+1]
lower_rc_lines = all_lines[12+n_cons+n_vars+3+n_rc_upper+2:12+n_cons+n_vars+3+n_rc_upper+2+n_rc_lower]
self._dual_sol = dict()
self._primal_sol = ComponentMap()
self._reduced_costs = ComponentMap()
for ndx, dual in enumerate(dual_lines):
dual = float(dual)
con = solve_cons[ndx]
self._dual_sol[con] = dual
for ndx, primal in enumerate(primal_lines):
primal = float(primal)
var = solve_vars[ndx]
self._primal_sol[var] = primal
for rcu_line in upper_rc_lines:
split_line = rcu_line.split()
var_ndx = int(split_line[0])
rcu = float(split_line[1])
var = solve_vars[var_ndx]
self._reduced_costs[var] = rcu
for rcl_line in lower_rc_lines:
split_line = rcl_line.split()
var_ndx = int(split_line[0])
rcl = float(split_line[1])
var = solve_vars[var_ndx]
if var in self._reduced_costs:
if abs(rcl) > abs(self._reduced_costs[var]):
self._reduced_costs[var] = rcl
else:
self._reduced_costs[var] = rcl
for var in solve_vars:
if var not in self._reduced_costs:
self._reduced_costs[var] = 0
if results.termination_condition == TerminationCondition.optimal and self.config.load_solution:
for v, val in self._primal_sol.items():
v.set_value(val, skip_validation=True)
if self._writer.get_active_objective() is None:
results.best_feasible_objective = None
else:
results.best_feasible_objective = value(self._writer.get_active_objective().expr)
elif results.termination_condition == TerminationCondition.optimal:
if self._writer.get_active_objective() is None:
results.best_feasible_objective = None
else:
obj_expr_evaluated = replace_expressions(self._writer.get_active_objective().expr,
substitution_map={id(v): val for v, val in self._primal_sol.items()},
descend_into_named_expressions=True,
remove_named_expressions=True)
results.best_feasible_objective = value(obj_expr_evaluated)
elif self.config.load_solution:
raise RuntimeError('A feasible solution was not found, so no solution can be loaded.'
'Please set opt.config.load_solution=False and check '
'results.termination_condition and '
'resutls.best_feasible_objective before loading a solution.')
return results
def _apply_solver(self, timer: HierarchicalTimer):
config = self.config
if config.time_limit is not None:
timeout = config.time_limit + min(max(1.0, 0.01 * config.time_limit), 100)
else:
timeout = None
ostreams = [LogStream(level=self.config.log_level, logger=self.config.solver_output_logger)]
if self.config.stream_solver:
ostreams.append(sys.stdout)
cmd = [str(config.executable),
self._filename + '.nl',
'-AMPL',
'option_file_name=' + self._filename + '.opt']
if 'option_file_name' in self.ipopt_options:
raise ValueError('Use Ipopt.config.filename to specify the name of the options file. '
'Do not use Ipopt.ipopt_options["option_file_name"].')
ipopt_options = dict(self.ipopt_options)
if config.time_limit is not None:
ipopt_options['max_cpu_time'] = config.time_limit
for k, v in ipopt_options.items():
cmd.append(str(k) + '=' + str(v))
env = os.environ.copy()
if 'PYOMO_AMPLFUNC' in env:
env['AMPLFUNC'] = "\n".join(filter(None, (env.get('AMPLFUNC', None), env.get('PYOMO_AMPLFUNC', None))))
with TeeStream(*ostreams) as t:
timer.start('subprocess')
cp = subprocess.run(cmd,
timeout=timeout,
stdout=t.STDOUT,
stderr=t.STDERR,
env=env,
universal_newlines=True)
timer.stop('subprocess')
if cp.returncode != 0:
if self.config.load_solution:
raise RuntimeError('A feasible solution was not found, so no solution can be loaded.'
'Please set opt.config.load_solution=False and check '
'results.termination_condition and '
'results.best_feasible_objective before loading a solution.')
results = Results()
results.termination_condition = TerminationCondition.error
results.best_feasible_objective = None
else:
timer.start('parse solution')
results = self._parse_sol()
timer.stop('parse solution')
if self._writer.get_active_objective() is None:
results.best_objective_bound = None
else:
if self._writer.get_active_objective().sense == minimize:
results.best_objective_bound = -math.inf
else:
results.best_objective_bound = math.inf
results.solution_loader = PersistentSolutionLoader(solver=self)
return results
[docs] def get_primals(self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None) -> Mapping[_GeneralVarData, float]:
if self._last_results_object is None or self._last_results_object.best_feasible_objective is None:
raise RuntimeError(
'Solver does not currently have a valid solution. Please '
'check the termination condition.'
)
res = ComponentMap()
if vars_to_load is None:
for v, val in self._primal_sol.items():
res[v] = val
else:
for v in vars_to_load:
res[v] = self._primal_sol[v]
return res
[docs] def get_duals(self, cons_to_load: Optional[Sequence[_GeneralConstraintData]] = None):
if self._last_results_object is None or self._last_results_object.termination_condition != TerminationCondition.optimal:
raise RuntimeError(
'Solver does not currently have valid duals. Please '
'check the termination condition.'
)
if cons_to_load is None:
return {k: v for k, v in self._dual_sol.items()}
else:
return {c: self._dual_sol[c] for c in cons_to_load}
[docs] def get_reduced_costs(self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None) -> Mapping[_GeneralVarData, float]:
if self._last_results_object is None or self._last_results_object.termination_condition != TerminationCondition.optimal:
raise RuntimeError(
'Solver does not currently have valid reduced costs. Please '
'check the termination condition.'
)
if vars_to_load is None:
return ComponentMap((k, v) for k, v in self._reduced_costs.items())
else:
return ComponentMap((v, self._reduced_costs[v]) for v in vars_to_load)