Source code for pyomo.contrib.appsi.solvers.ipopt

from pyomo.common.tempfiles import TempfileManager
from pyomo.common.fileutils import Executable
from pyomo.contrib.appsi.base import PersistentSolver, Results, TerminationCondition, SolverConfig, PersistentSolutionLoader
from pyomo.contrib.appsi.writers import NLWriter
from pyomo.common.log import LogStream
import logging
import subprocess
from pyomo.core.kernel.objective import minimize
import math
from pyomo.common.collections import ComponentMap
from pyomo.core.expr.numvalue import value
from pyomo.core.expr.visitor import replace_expressions
from typing import Optional, Sequence, NoReturn, List, Mapping
from pyomo.core.base.var import _GeneralVarData
from pyomo.core.base.constraint import _GeneralConstraintData
from pyomo.core.base.block import _BlockData
from pyomo.core.base.param import _ParamData
from pyomo.core.base.objective import _GeneralObjectiveData
from pyomo.common.timing import HierarchicalTimer
from pyomo.common.tee import TeeStream
import sys
from typing import Dict
from pyomo.common.config import ConfigValue, NonNegativeInt
from pyomo.common.errors import PyomoException
import os
from pyomo.contrib.appsi.cmodel import cmodel_available
from pyomo.core.staleflag import StaleFlagManager


logger = logging.getLogger(__name__)


[docs]class IpoptConfig(SolverConfig): def __init__(self, description=None, doc=None, implicit=False, implicit_domain=None, visibility=0): super(IpoptConfig, self).__init__(description=description, doc=doc, implicit=implicit, implicit_domain=implicit_domain, visibility=visibility) self.declare('executable', ConfigValue()) self.declare('filename', ConfigValue(domain=str)) self.declare('keepfiles', ConfigValue(domain=bool)) self.declare('solver_output_logger', ConfigValue()) self.declare('log_level', ConfigValue(domain=NonNegativeInt)) self.executable = Executable('ipopt') self.filename = None self.keepfiles = False self.solver_output_logger = logger self.log_level = logging.INFO
ipopt_command_line_options = {'acceptable_compl_inf_tol', 'acceptable_constr_viol_tol', 'acceptable_dual_inf_tol', 'acceptable_tol', 'alpha_for_y', 'bound_frac', 'bound_mult_init_val', 'bound_push', 'bound_relax_factor', 'compl_inf_tol', 'constr_mult_init_max', 'constr_viol_tol', 'diverging_iterates_tol', 'dual_inf_tol', 'expect_infeasible_problem', 'file_print_level', 'halt_on_ampl_error', 'hessian_approximation', 'honor_original_bounds', 'linear_scaling_on_demand', 'linear_solver', 'linear_system_scaling', 'ma27_pivtol', 'ma27_pivtolmax', 'ma57_pivot_order', 'ma57_pivtol', 'ma57_pivtolmax', 'max_cpu_time', 'max_iter', 'max_refinement_steps', 'max_soc', 'maxit', 'min_refinement_steps', 'mu_init', 'mu_max', 'mu_oracle', 'mu_strategy', 'nlp_scaling_max_gradient', 'nlp_scaling_method', 'obj_scaling_factor', 'option_file_name', 'outlev', 'output_file', 'pardiso_matching_strategy', 'print_level', 'print_options_documentation', 'print_user_options', 'required_infeasibility_reduction', 'slack_bound_frac', 'slack_bound_push', 'tol', 'wantsol', 'warm_start_bound_push', 'warm_start_init_point', 'warm_start_mult_bound_push', 'watchdog_shortened_iter_trigger'}
[docs]class Ipopt(PersistentSolver): def __init__(self, only_child_vars=True): self._config = IpoptConfig() self._solver_options = dict() self._writer = NLWriter(only_child_vars=only_child_vars) self._filename = None self._dual_sol = dict() self._primal_sol = ComponentMap() self._reduced_costs = ComponentMap() self._last_results_object: Optional[Results] = None
[docs] def available(self): if self.config.executable.path() is None: return self.Availability.NotFound elif not cmodel_available: return self.Availability.NeedsCompiledExtension return self.Availability.FullLicense
[docs] def version(self): results = subprocess.run([str(self.config.executable), '--version'], timeout=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) version = results.stdout.splitlines()[0] version = version.split(' ')[1] version = version.strip() version = tuple(int(i) for i in version.split('.')) return version
[docs] def nl_filename(self): if self._filename is None: return None else: return self._filename + '.nl'
[docs] def sol_filename(self): if self._filename is None: return None else: return self._filename + '.sol'
[docs] def options_filename(self): if self._filename is None: return None else: return self._filename + '.opt'
@property def config(self): return self._config @config.setter def config(self, val): self._config = val @property def ipopt_options(self): """ A dictionary mapping solver options to values for those options. These are solver specific. Returns ------- dict A dictionary mapping solver options to values for those options """ return self._solver_options @ipopt_options.setter def ipopt_options(self, val: Dict): self._solver_options = val @property def update_config(self): return self._writer.update_config @property def writer(self): return self._writer @property def symbol_map(self): return self._writer.symbol_map
[docs] def set_instance(self, model): self._writer.config.symbolic_solver_labels = self.config.symbolic_solver_labels self._writer.set_instance(model)
[docs] def add_variables(self, variables: List[_GeneralVarData]): self._writer.add_variables(variables)
[docs] def add_params(self, params: List[_ParamData]): self._writer.add_params(params)
[docs] def add_constraints(self, cons: List[_GeneralConstraintData]): self._writer.add_constraints(cons)
[docs] def add_block(self, block: _BlockData): self._writer.add_block(block)
[docs] def remove_variables(self, variables: List[_GeneralVarData]): self._writer.remove_variables(variables)
[docs] def remove_params(self, params: List[_ParamData]): self._writer.remove_params(params)
[docs] def remove_constraints(self, cons: List[_GeneralConstraintData]): self._writer.remove_constraints(cons)
[docs] def remove_block(self, block: _BlockData): self._writer.remove_block(block)
[docs] def set_objective(self, obj: _GeneralObjectiveData): self._writer.set_objective(obj)
[docs] def update_variables(self, variables: List[_GeneralVarData]): self._writer.update_variables(variables)
[docs] def update_params(self): self._writer.update_params()
def _write_options_file(self): f = open(self._filename + '.opt', 'w') for k, val in self.ipopt_options.items(): if k not in ipopt_command_line_options: f.write(str(k) + ' ' + str(val) + '\n') f.close()
[docs] def solve(self, model, timer: HierarchicalTimer = None): StaleFlagManager.mark_all_as_stale() avail = self.available() if not avail: raise PyomoException(f'Solver {self.__class__} is not available ({avail}).') if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() if timer is None: timer = HierarchicalTimer() try: TempfileManager.push() if self.config.filename is None: nl_filename = TempfileManager.create_tempfile(suffix='.nl') self._filename = nl_filename.split('.')[0] else: self._filename = self.config.filename TempfileManager.add_tempfile(self._filename + '.nl', exists=False) TempfileManager.add_tempfile(self._filename + '.sol', exists=False) TempfileManager.add_tempfile(self._filename + '.opt', exists=False) self._write_options_file() timer.start('write nl file') self._writer.write(model, self._filename+'.nl', timer=timer) timer.stop('write nl file') res = self._apply_solver(timer) self._last_results_object = res if self.config.report_timing: logger.info('\n' + str(timer)) return res finally: # finally, clean any temporary files registered with the # temp file manager, created/populated *directly* by this # plugin. TempfileManager.pop(remove=not self.config.keepfiles) if not self.config.keepfiles: self._filename = None
def _parse_sol(self): solve_vars = self._writer.get_ordered_vars() solve_cons = self._writer.get_ordered_cons() results = Results() f = open(self._filename + '.sol', 'r') all_lines = list(f.readlines()) f.close() termination_line = all_lines[1] if 'Optimal Solution Found' in termination_line: results.termination_condition = TerminationCondition.optimal elif 'Problem may be infeasible' in termination_line: results.termination_condition = TerminationCondition.infeasible elif 'problem might be unbounded' in termination_line: results.termination_condition = TerminationCondition.unbounded elif 'Maximum Number of Iterations Exceeded' in termination_line: results.termination_condition = TerminationCondition.maxIterations elif 'Maximum CPU Time Exceeded' in termination_line: results.termination_condition = TerminationCondition.maxTimeLimit else: results.termination_condition = TerminationCondition.unknown n_cons = len(solve_cons) n_vars = len(solve_vars) dual_lines = all_lines[12:12+n_cons] primal_lines = all_lines[12+n_cons:12+n_cons+n_vars] rc_upper_info_line = all_lines[12+n_cons+n_vars+1] assert rc_upper_info_line.startswith('suffix') n_rc_upper = int(rc_upper_info_line.split()[2]) assert 'ipopt_zU_out' in all_lines[12+n_cons+n_vars+2] upper_rc_lines = all_lines[12+n_cons+n_vars+3:12+n_cons+n_vars+3+n_rc_upper] rc_lower_info_line = all_lines[12+n_cons+n_vars+3+n_rc_upper] assert rc_lower_info_line.startswith('suffix') n_rc_lower = int(rc_lower_info_line.split()[2]) assert 'ipopt_zL_out' in all_lines[12+n_cons+n_vars+3+n_rc_upper+1] lower_rc_lines = all_lines[12+n_cons+n_vars+3+n_rc_upper+2:12+n_cons+n_vars+3+n_rc_upper+2+n_rc_lower] self._dual_sol = dict() self._primal_sol = ComponentMap() self._reduced_costs = ComponentMap() for ndx, dual in enumerate(dual_lines): dual = float(dual) con = solve_cons[ndx] self._dual_sol[con] = dual for ndx, primal in enumerate(primal_lines): primal = float(primal) var = solve_vars[ndx] self._primal_sol[var] = primal for rcu_line in upper_rc_lines: split_line = rcu_line.split() var_ndx = int(split_line[0]) rcu = float(split_line[1]) var = solve_vars[var_ndx] self._reduced_costs[var] = rcu for rcl_line in lower_rc_lines: split_line = rcl_line.split() var_ndx = int(split_line[0]) rcl = float(split_line[1]) var = solve_vars[var_ndx] if var in self._reduced_costs: if abs(rcl) > abs(self._reduced_costs[var]): self._reduced_costs[var] = rcl else: self._reduced_costs[var] = rcl for var in solve_vars: if var not in self._reduced_costs: self._reduced_costs[var] = 0 if results.termination_condition == TerminationCondition.optimal and self.config.load_solution: for v, val in self._primal_sol.items(): v.set_value(val, skip_validation=True) if self._writer.get_active_objective() is None: results.best_feasible_objective = None else: results.best_feasible_objective = value(self._writer.get_active_objective().expr) elif results.termination_condition == TerminationCondition.optimal: if self._writer.get_active_objective() is None: results.best_feasible_objective = None else: obj_expr_evaluated = replace_expressions(self._writer.get_active_objective().expr, substitution_map={id(v): val for v, val in self._primal_sol.items()}, descend_into_named_expressions=True, remove_named_expressions=True) results.best_feasible_objective = value(obj_expr_evaluated) elif self.config.load_solution: raise RuntimeError('A feasible solution was not found, so no solution can be loaded.' 'Please set opt.config.load_solution=False and check ' 'results.termination_condition and ' 'resutls.best_feasible_objective before loading a solution.') return results def _apply_solver(self, timer: HierarchicalTimer): config = self.config if config.time_limit is not None: timeout = config.time_limit + min(max(1.0, 0.01 * config.time_limit), 100) else: timeout = None ostreams = [LogStream(level=self.config.log_level, logger=self.config.solver_output_logger)] if self.config.stream_solver: ostreams.append(sys.stdout) cmd = [str(config.executable), self._filename + '.nl', '-AMPL', 'option_file_name=' + self._filename + '.opt'] if 'option_file_name' in self.ipopt_options: raise ValueError('Use Ipopt.config.filename to specify the name of the options file. ' 'Do not use Ipopt.ipopt_options["option_file_name"].') ipopt_options = dict(self.ipopt_options) if config.time_limit is not None: ipopt_options['max_cpu_time'] = config.time_limit for k, v in ipopt_options.items(): cmd.append(str(k) + '=' + str(v)) env = os.environ.copy() if 'PYOMO_AMPLFUNC' in env: env['AMPLFUNC'] = "\n".join(filter(None, (env.get('AMPLFUNC', None), env.get('PYOMO_AMPLFUNC', None)))) with TeeStream(*ostreams) as t: timer.start('subprocess') cp = subprocess.run(cmd, timeout=timeout, stdout=t.STDOUT, stderr=t.STDERR, env=env, universal_newlines=True) timer.stop('subprocess') if cp.returncode != 0: if self.config.load_solution: raise RuntimeError('A feasible solution was not found, so no solution can be loaded.' 'Please set opt.config.load_solution=False and check ' 'results.termination_condition and ' 'results.best_feasible_objective before loading a solution.') results = Results() results.termination_condition = TerminationCondition.error results.best_feasible_objective = None else: timer.start('parse solution') results = self._parse_sol() timer.stop('parse solution') if self._writer.get_active_objective() is None: results.best_objective_bound = None else: if self._writer.get_active_objective().sense == minimize: results.best_objective_bound = -math.inf else: results.best_objective_bound = math.inf results.solution_loader = PersistentSolutionLoader(solver=self) return results
[docs] def get_primals(self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None) -> Mapping[_GeneralVarData, float]: if self._last_results_object is None or self._last_results_object.best_feasible_objective is None: raise RuntimeError( 'Solver does not currently have a valid solution. Please ' 'check the termination condition.' ) res = ComponentMap() if vars_to_load is None: for v, val in self._primal_sol.items(): res[v] = val else: for v in vars_to_load: res[v] = self._primal_sol[v] return res
[docs] def get_duals(self, cons_to_load: Optional[Sequence[_GeneralConstraintData]] = None): if self._last_results_object is None or self._last_results_object.termination_condition != TerminationCondition.optimal: raise RuntimeError( 'Solver does not currently have valid duals. Please ' 'check the termination condition.' ) if cons_to_load is None: return {k: v for k, v in self._dual_sol.items()} else: return {c: self._dual_sol[c] for c in cons_to_load}
[docs] def get_reduced_costs(self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None) -> Mapping[_GeneralVarData, float]: if self._last_results_object is None or self._last_results_object.termination_condition != TerminationCondition.optimal: raise RuntimeError( 'Solver does not currently have valid reduced costs. Please ' 'check the termination condition.' ) if vars_to_load is None: return ComponentMap((k, v) for k, v in self._reduced_costs.items()) else: return ComponentMap((v, self._reduced_costs[v]) for v in vars_to_load)