Source code for pyomo.contrib.appsi.solvers.ipopt

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

from pyomo.common.tempfiles import TempfileManager
from pyomo.common.fileutils import Executable
from pyomo.contrib.appsi.base import (
    PersistentSolver,
    Results,
    TerminationCondition,
    SolverConfig,
    PersistentSolutionLoader,
)
from pyomo.contrib.appsi.writers import NLWriter
from pyomo.common.log import LogStream
import logging
import subprocess
from pyomo.core.kernel.objective import minimize
import math
from pyomo.common.collections import ComponentMap
from pyomo.core.expr.numvalue import value
from pyomo.core.expr.visitor import replace_expressions
from typing import Optional, Sequence, NoReturn, List, Mapping
from pyomo.core.base.var import _GeneralVarData
from pyomo.core.base.constraint import _GeneralConstraintData
from pyomo.core.base.block import _BlockData
from pyomo.core.base.param import _ParamData
from pyomo.core.base.objective import _GeneralObjectiveData
from pyomo.common.timing import HierarchicalTimer
from pyomo.common.tee import TeeStream
import sys
from typing import Dict
from pyomo.common.config import ConfigValue, NonNegativeInt
from pyomo.common.errors import PyomoException
import os
from pyomo.contrib.appsi.cmodel import cmodel_available
from pyomo.core.staleflag import StaleFlagManager


logger = logging.getLogger(__name__)


[docs]class IpoptConfig(SolverConfig): def __init__( self, description=None, doc=None, implicit=False, implicit_domain=None, visibility=0, ): super(IpoptConfig, self).__init__( description=description, doc=doc, implicit=implicit, implicit_domain=implicit_domain, visibility=visibility, ) self.declare('executable', ConfigValue()) self.declare('filename', ConfigValue(domain=str)) self.declare('keepfiles', ConfigValue(domain=bool)) self.declare('solver_output_logger', ConfigValue()) self.declare('log_level', ConfigValue(domain=NonNegativeInt)) self.executable = Executable('ipopt') self.filename = None self.keepfiles = False self.solver_output_logger = logger self.log_level = logging.INFO
ipopt_command_line_options = { 'acceptable_compl_inf_tol', 'acceptable_constr_viol_tol', 'acceptable_dual_inf_tol', 'acceptable_tol', 'alpha_for_y', 'bound_frac', 'bound_mult_init_val', 'bound_push', 'bound_relax_factor', 'compl_inf_tol', 'constr_mult_init_max', 'constr_viol_tol', 'diverging_iterates_tol', 'dual_inf_tol', 'expect_infeasible_problem', 'file_print_level', 'halt_on_ampl_error', 'hessian_approximation', 'honor_original_bounds', 'linear_scaling_on_demand', 'linear_solver', 'linear_system_scaling', 'ma27_pivtol', 'ma27_pivtolmax', 'ma57_pivot_order', 'ma57_pivtol', 'ma57_pivtolmax', 'max_cpu_time', 'max_iter', 'max_refinement_steps', 'max_soc', 'maxit', 'min_refinement_steps', 'mu_init', 'mu_max', 'mu_oracle', 'mu_strategy', 'nlp_scaling_max_gradient', 'nlp_scaling_method', 'obj_scaling_factor', 'option_file_name', 'outlev', 'output_file', 'pardiso_matching_strategy', 'print_level', 'print_options_documentation', 'print_user_options', 'required_infeasibility_reduction', 'slack_bound_frac', 'slack_bound_push', 'tol', 'wantsol', 'warm_start_bound_push', 'warm_start_init_point', 'warm_start_mult_bound_push', 'watchdog_shortened_iter_trigger', }
[docs]class Ipopt(PersistentSolver): def __init__(self, only_child_vars=False): self._config = IpoptConfig() self._solver_options = dict() self._writer = NLWriter(only_child_vars=only_child_vars) self._filename = None self._dual_sol = dict() self._primal_sol = ComponentMap() self._reduced_costs = ComponentMap() self._last_results_object: Optional[Results] = None
[docs] def available(self): if self.config.executable.path() is None: return self.Availability.NotFound elif not cmodel_available: return self.Availability.NeedsCompiledExtension return self.Availability.FullLicense
[docs] def version(self): results = subprocess.run( [str(self.config.executable), '--version'], timeout=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, ) version = results.stdout.splitlines()[0] version = version.split(' ')[1] version = version.strip() version = tuple(int(i) for i in version.split('.')) return version
[docs] def nl_filename(self): if self._filename is None: return None else: return self._filename + '.nl'
[docs] def sol_filename(self): if self._filename is None: return None else: return self._filename + '.sol'
[docs] def options_filename(self): if self._filename is None: return None else: return self._filename + '.opt'
@property def config(self): return self._config @config.setter def config(self, val): self._config = val @property def ipopt_options(self): """ A dictionary mapping solver options to values for those options. These are solver specific. Returns ------- dict A dictionary mapping solver options to values for those options """ return self._solver_options @ipopt_options.setter def ipopt_options(self, val: Dict): self._solver_options = val @property def update_config(self): return self._writer.update_config @property def writer(self): return self._writer @property def symbol_map(self): return self._writer.symbol_map
[docs] def set_instance(self, model): self._writer.config.symbolic_solver_labels = self.config.symbolic_solver_labels self._writer.set_instance(model)
[docs] def add_variables(self, variables: List[_GeneralVarData]): self._writer.add_variables(variables)
[docs] def add_params(self, params: List[_ParamData]): self._writer.add_params(params)
[docs] def add_constraints(self, cons: List[_GeneralConstraintData]): self._writer.add_constraints(cons)
[docs] def add_block(self, block: _BlockData): self._writer.add_block(block)
[docs] def remove_variables(self, variables: List[_GeneralVarData]): self._writer.remove_variables(variables)
[docs] def remove_params(self, params: List[_ParamData]): self._writer.remove_params(params)
[docs] def remove_constraints(self, cons: List[_GeneralConstraintData]): self._writer.remove_constraints(cons)
[docs] def remove_block(self, block: _BlockData): self._writer.remove_block(block)
[docs] def set_objective(self, obj: _GeneralObjectiveData): self._writer.set_objective(obj)
[docs] def update_variables(self, variables: List[_GeneralVarData]): self._writer.update_variables(variables)
[docs] def update_params(self): self._writer.update_params()
def _write_options_file(self): f = open(self._filename + '.opt', 'w') for k, val in self.ipopt_options.items(): if k not in ipopt_command_line_options: f.write(str(k) + ' ' + str(val) + '\n') f.close()
[docs] def solve(self, model, timer: HierarchicalTimer = None): StaleFlagManager.mark_all_as_stale() avail = self.available() if not avail: raise PyomoException(f'Solver {self.__class__} is not available ({avail}).') if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() if timer is None: timer = HierarchicalTimer() try: TempfileManager.push() if self.config.filename is None: nl_filename = TempfileManager.create_tempfile(suffix='.nl') self._filename = nl_filename.split('.')[0] else: self._filename = self.config.filename TempfileManager.add_tempfile(self._filename + '.nl', exists=False) TempfileManager.add_tempfile(self._filename + '.sol', exists=False) TempfileManager.add_tempfile(self._filename + '.opt', exists=False) self._write_options_file() timer.start('write nl file') self._writer.write(model, self._filename + '.nl', timer=timer) timer.stop('write nl file') res = self._apply_solver(timer) self._last_results_object = res if self.config.report_timing: logger.info('\n' + str(timer)) return res finally: # finally, clean any temporary files registered with the # temp file manager, created/populated *directly* by this # plugin. TempfileManager.pop(remove=not self.config.keepfiles) if not self.config.keepfiles: self._filename = None
def _parse_sol(self): solve_vars = self._writer.get_ordered_vars() solve_cons = self._writer.get_ordered_cons() results = Results() f = open(self._filename + '.sol', 'r') all_lines = list(f.readlines()) f.close() termination_line = all_lines[1] if 'Optimal Solution Found' in termination_line: results.termination_condition = TerminationCondition.optimal elif 'Problem may be infeasible' in termination_line: results.termination_condition = TerminationCondition.infeasible elif 'problem might be unbounded' in termination_line: results.termination_condition = TerminationCondition.unbounded elif 'Maximum Number of Iterations Exceeded' in termination_line: results.termination_condition = TerminationCondition.maxIterations elif 'Maximum CPU Time Exceeded' in termination_line: results.termination_condition = TerminationCondition.maxTimeLimit else: results.termination_condition = TerminationCondition.unknown n_cons = len(solve_cons) n_vars = len(solve_vars) dual_lines = all_lines[12 : 12 + n_cons] primal_lines = all_lines[12 + n_cons : 12 + n_cons + n_vars] rc_upper_info_line = all_lines[12 + n_cons + n_vars + 1] assert rc_upper_info_line.startswith('suffix') n_rc_upper = int(rc_upper_info_line.split()[2]) assert 'ipopt_zU_out' in all_lines[12 + n_cons + n_vars + 2] upper_rc_lines = all_lines[ 12 + n_cons + n_vars + 3 : 12 + n_cons + n_vars + 3 + n_rc_upper ] rc_lower_info_line = all_lines[12 + n_cons + n_vars + 3 + n_rc_upper] assert rc_lower_info_line.startswith('suffix') n_rc_lower = int(rc_lower_info_line.split()[2]) assert 'ipopt_zL_out' in all_lines[12 + n_cons + n_vars + 3 + n_rc_upper + 1] lower_rc_lines = all_lines[ 12 + n_cons + n_vars + 3 + n_rc_upper + 2 : 12 + n_cons + n_vars + 3 + n_rc_upper + 2 + n_rc_lower ] self._dual_sol = dict() self._primal_sol = ComponentMap() self._reduced_costs = ComponentMap() for ndx, dual in enumerate(dual_lines): dual = float(dual) con = solve_cons[ndx] self._dual_sol[con] = dual for ndx, primal in enumerate(primal_lines): primal = float(primal) var = solve_vars[ndx] self._primal_sol[var] = primal for rcu_line in upper_rc_lines: split_line = rcu_line.split() var_ndx = int(split_line[0]) rcu = float(split_line[1]) var = solve_vars[var_ndx] self._reduced_costs[var] = rcu for rcl_line in lower_rc_lines: split_line = rcl_line.split() var_ndx = int(split_line[0]) rcl = float(split_line[1]) var = solve_vars[var_ndx] if var in self._reduced_costs: if abs(rcl) > abs(self._reduced_costs[var]): self._reduced_costs[var] = rcl else: self._reduced_costs[var] = rcl for var in solve_vars: if var not in self._reduced_costs: self._reduced_costs[var] = 0 if ( results.termination_condition == TerminationCondition.optimal and self.config.load_solution ): for v, val in self._primal_sol.items(): v.set_value(val, skip_validation=True) if self._writer.get_active_objective() is None: results.best_feasible_objective = None else: results.best_feasible_objective = value( self._writer.get_active_objective().expr ) elif results.termination_condition == TerminationCondition.optimal: if self._writer.get_active_objective() is None: results.best_feasible_objective = None else: obj_expr_evaluated = replace_expressions( self._writer.get_active_objective().expr, substitution_map={ id(v): val for v, val in self._primal_sol.items() }, descend_into_named_expressions=True, remove_named_expressions=True, ) results.best_feasible_objective = value(obj_expr_evaluated) elif self.config.load_solution: raise RuntimeError( 'A feasible solution was not found, so no solution can be loaded. ' 'If using the appsi.solvers.Ipopt interface, you can ' 'set opt.config.load_solution=False. If using the environ.SolverFactory ' 'interface, you can set opt.solve(model, load_solutions = False). ' 'Then you can check results.termination_condition and ' 'results.best_feasible_objective before loading a solution.' ) return results def _apply_solver(self, timer: HierarchicalTimer): config = self.config if config.time_limit is not None: timeout = config.time_limit + min(max(1.0, 0.01 * config.time_limit), 100) else: timeout = None ostreams = [ LogStream( level=self.config.log_level, logger=self.config.solver_output_logger ) ] if self.config.stream_solver: ostreams.append(sys.stdout) cmd = [ str(config.executable), self._filename + '.nl', '-AMPL', 'option_file_name=' + self._filename + '.opt', ] if 'option_file_name' in self.ipopt_options: raise ValueError( 'Use Ipopt.config.filename to specify the name of the options file. ' 'Do not use Ipopt.ipopt_options["option_file_name"].' ) ipopt_options = dict(self.ipopt_options) if config.time_limit is not None: ipopt_options['max_cpu_time'] = config.time_limit for k, v in ipopt_options.items(): cmd.append(str(k) + '=' + str(v)) env = os.environ.copy() if 'PYOMO_AMPLFUNC' in env: env['AMPLFUNC'] = "\n".join( filter( None, (env.get('AMPLFUNC', None), env.get('PYOMO_AMPLFUNC', None)) ) ) with TeeStream(*ostreams) as t: timer.start('subprocess') cp = subprocess.run( cmd, timeout=timeout, stdout=t.STDOUT, stderr=t.STDERR, env=env, universal_newlines=True, ) timer.stop('subprocess') if cp.returncode != 0: if self.config.load_solution: raise RuntimeError( 'A feasible solution was not found, so no solution can be loaded.' 'Please set opt.config.load_solution=False and check ' 'results.termination_condition and ' 'results.best_feasible_objective before loading a solution.' ) results = Results() results.termination_condition = TerminationCondition.error results.best_feasible_objective = None else: timer.start('parse solution') results = self._parse_sol() timer.stop('parse solution') if self._writer.get_active_objective() is None: results.best_objective_bound = None else: if self._writer.get_active_objective().sense == minimize: results.best_objective_bound = -math.inf else: results.best_objective_bound = math.inf results.solution_loader = PersistentSolutionLoader(solver=self) return results
[docs] def get_primals( self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None ) -> Mapping[_GeneralVarData, float]: if ( self._last_results_object is None or self._last_results_object.best_feasible_objective is None ): raise RuntimeError( 'Solver does not currently have a valid solution. Please ' 'check the termination condition.' ) res = ComponentMap() if vars_to_load is None: for v, val in self._primal_sol.items(): res[v] = val else: for v in vars_to_load: res[v] = self._primal_sol[v] return res
[docs] def get_duals( self, cons_to_load: Optional[Sequence[_GeneralConstraintData]] = None ): if ( self._last_results_object is None or self._last_results_object.termination_condition != TerminationCondition.optimal ): raise RuntimeError( 'Solver does not currently have valid duals. Please ' 'check the termination condition.' ) if cons_to_load is None: return {k: v for k, v in self._dual_sol.items()} else: return {c: self._dual_sol[c] for c in cons_to_load}
[docs] def get_reduced_costs( self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None ) -> Mapping[_GeneralVarData, float]: if ( self._last_results_object is None or self._last_results_object.termination_condition != TerminationCondition.optimal ): raise RuntimeError( 'Solver does not currently have valid reduced costs. Please ' 'check the termination condition.' ) if vars_to_load is None: return ComponentMap((k, v) for k, v in self._reduced_costs.items()) else: return ComponentMap((v, self._reduced_costs[v]) for v in vars_to_load)