Source code for pyomo.contrib.solver.solvers.ipopt

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2025
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import logging
import os
import subprocess
import datetime
import io
import re
import sys
from typing import Optional, Tuple, Union, Mapping, List, Dict, Any, Sequence

from pyomo.common import Executable
from pyomo.common.config import (
    ConfigValue,
    document_class_CONFIG,
    ConfigDict,
    ADVANCED_OPTION,
)
from pyomo.common.errors import (
    ApplicationError,
    DeveloperError,
    InfeasibleConstraintException,
)
from pyomo.common.tempfiles import TempfileManager
from pyomo.common.timing import HierarchicalTimer
from pyomo.core.base.var import VarData
from pyomo.core.staleflag import StaleFlagManager
from pyomo.repn.plugins.nl_writer import NLWriter, NLWriterInfo
from pyomo.contrib.solver.common.base import SolverBase, Availability
from pyomo.contrib.solver.common.config import SolverConfig
from pyomo.contrib.solver.common.results import (
    Results,
    TerminationCondition,
    SolutionStatus,
)
from pyomo.contrib.solver.solvers.sol_reader import parse_sol_file, SolSolutionLoader
from pyomo.contrib.solver.common.util import (
    NoFeasibleSolutionError,
    NoOptimalSolutionError,
    NoSolutionError,
)
from pyomo.common.tee import TeeStream
from pyomo.core.expr.visitor import replace_expressions
from pyomo.core.expr.numvalue import value
from pyomo.core.base.suffix import Suffix
from pyomo.common.collections import ComponentMap
from pyomo.solvers.amplfunc_merge import amplfunc_merge

logger = logging.getLogger(__name__)

# Acceptable chars for the end of the alpha_pr column
# in ipopt's output, per https://coin-or.github.io/Ipopt/OUTPUT.html
_ALPHA_PR_CHARS = set("fFhHkKnNRwstTr")


[docs] class IpoptConfig(SolverConfig):
[docs] def __init__( self, description=None, doc=None, implicit=False, implicit_domain=None, visibility=0, ): super().__init__( description=description, doc=doc, implicit=implicit, implicit_domain=implicit_domain, visibility=visibility, ) self.executable: Executable = self.declare( 'executable', ConfigValue( domain=Executable, default='ipopt', description="Preferred executable for ipopt. Defaults to searching the " "``PATH`` for the first available ``ipopt``.", ), ) self.writer_config: ConfigDict = self.declare( 'writer_config', NLWriter.CONFIG() )
[docs] class IpoptSolutionLoader(SolSolutionLoader): def _error_check(self): if self._nl_info is None: raise NoSolutionError() if len(self._nl_info.eliminated_vars) > 0: raise NotImplementedError( 'For now, turn presolve off (opt.config.writer_config.linear_presolve=False) ' 'to get dual variable values.' ) if self._sol_data is None: raise DeveloperError( "Solution data is empty. This should not " "have happened. Report this error to the Pyomo Developers." )
[docs] def get_reduced_costs( self, vars_to_load: Optional[Sequence[VarData]] = None ) -> Mapping[VarData, float]: self._error_check() if self._nl_info.scaling is None: scale_list = [1] * len(self._nl_info.variables) obj_scale = 1 else: scale_list = self._nl_info.scaling.variables obj_scale = self._nl_info.scaling.objectives[0] sol_data = self._sol_data nl_info = self._nl_info zl_map = sol_data.var_suffixes['ipopt_zL_out'] zu_map = sol_data.var_suffixes['ipopt_zU_out'] rc = {} for ndx, v in enumerate(nl_info.variables): scale = scale_list[ndx] v_id = id(v) rc[v_id] = (v, 0) if ndx in zl_map: zl = zl_map[ndx] * scale / obj_scale if abs(zl) > abs(rc[v_id][1]): rc[v_id] = (v, zl) if ndx in zu_map: zu = zu_map[ndx] * scale / obj_scale if abs(zu) > abs(rc[v_id][1]): rc[v_id] = (v, zu) if vars_to_load is None: res = ComponentMap(rc.values()) for v, _ in nl_info.eliminated_vars: res[v] = 0 else: res = ComponentMap() for v in vars_to_load: if id(v) in rc: res[v] = rc[id(v)][1] else: # eliminated vars res[v] = 0 return res
ipopt_command_line_options = { 'acceptable_compl_inf_tol', 'acceptable_constr_viol_tol', 'acceptable_dual_inf_tol', 'acceptable_tol', 'alpha_for_y', 'bound_frac', 'bound_mult_init_val', 'bound_push', 'bound_relax_factor', 'compl_inf_tol', 'constr_mult_init_max', 'constr_viol_tol', 'diverging_iterates_tol', 'dual_inf_tol', 'expect_infeasible_problem', 'file_print_level', 'halt_on_ampl_error', 'hessian_approximation', 'honor_original_bounds', 'linear_scaling_on_demand', 'linear_solver', 'linear_system_scaling', 'ma27_pivtol', 'ma27_pivtolmax', 'ma57_pivot_order', 'ma57_pivtol', 'ma57_pivtolmax', 'max_cpu_time', 'max_iter', 'max_refinement_steps', 'max_soc', 'maxit', 'min_refinement_steps', 'mu_init', 'mu_max', 'mu_oracle', 'mu_strategy', 'nlp_scaling_max_gradient', 'nlp_scaling_method', 'obj_scaling_factor', 'option_file_name', 'outlev', 'output_file', 'pardiso_matching_strategy', 'print_level', 'print_options_documentation', 'print_user_options', 'required_infeasibility_reduction', 'slack_bound_frac', 'slack_bound_push', 'tol', 'wantsol', 'warm_start_bound_push', 'warm_start_init_point', 'warm_start_mult_bound_push', 'watchdog_shortened_iter_trigger', }
[docs] @document_class_CONFIG(methods=['solve']) class Ipopt(SolverBase): """Interface to the Ipopt NLP solver (NL file based)""" #: Global class configuration; #: see :ref:`pyomo.contrib.solver.solvers.ipopt.Ipopt::CONFIG`. CONFIG = IpoptConfig()
[docs] def __init__(self, **kwds: Any) -> None: super().__init__(**kwds) self._writer = NLWriter() self._available_cache = None self._version_cache = None self._version_timeout = 2 #: Instance configuration; #: see :ref:`pyomo.contrib.solver.solvers.ipopt.Ipopt::CONFIG`. self.config = self.config
[docs] def available(self, config: Optional[IpoptConfig] = None) -> Availability: if config is None: config = self.config pth = config.executable.path() if self._available_cache is None or self._available_cache[0] != pth: if pth is None: self._available_cache = (None, Availability.NotFound) else: self._available_cache = (pth, Availability.FullLicense) return self._available_cache[1]
[docs] def version( self, config: Optional[IpoptConfig] = None ) -> Optional[Tuple[int, int, int]]: if config is None: config = self.config pth = config.executable.path() if self._version_cache is None or self._version_cache[0] != pth: if pth is None: self._version_cache = (None, None) else: results = subprocess.run( [str(pth), '--version'], timeout=self._version_timeout, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, check=False, ) version = results.stdout.splitlines()[0] version = version.split(' ')[1].strip() version = tuple(int(i) for i in version.split('.')) self._version_cache = (pth, version) return self._version_cache[1]
def has_linear_solver(self, linear_solver: str) -> bool: import pyomo.core as AML m = AML.ConcreteModel() m.x = AML.Var() m.o = AML.Objective(expr=(m.x - 2) ** 2) results = self.solve( m, tee=False, raise_exception_on_nonoptimal_result=False, load_solutions=False, solver_options={'linear_solver': linear_solver}, ) return 'running with linear solver' in results.solver_log def _write_options_file( self, filename: str, options: Mapping[str, Union[str, int, float]] ) -> bool: # First we need to determine if we even need to create a file. # If options is empty, then we return False opt_file_exists = False if not options: return False # If it has options in it, parse them and write them to a file. # If they are command line options, ignore them; they will be # parsed during _create_command_line for k, val in options.items(): if k not in ipopt_command_line_options: opt_file_exists = True with open(filename + '.opt', 'a+', encoding='utf-8') as opt_file: opt_file.write(str(k) + ' ' + str(val) + '\n') return opt_file_exists def _create_command_line( self, basename: str, config: IpoptConfig, opt_file: bool ) -> List[str]: cmd = [str(config.executable), basename + '.nl', '-AMPL'] if opt_file: cmd.append('option_file_name=' + basename + '.opt') if 'option_file_name' in config.solver_options: raise ValueError( 'Pyomo generates the ipopt options file as part of the `solve` method. ' 'Add all options to ipopt.config.solver_options instead.' ) if ( config.time_limit is not None and 'max_cpu_time' not in config.solver_options ): config.solver_options['max_cpu_time'] = config.time_limit for k, val in config.solver_options.items(): if k in ipopt_command_line_options: cmd.append(str(k) + '=' + str(val)) return cmd
[docs] def solve(self, model, **kwds) -> Results: "Solve a model using Ipopt" # Begin time tracking start_timestamp = datetime.datetime.now(datetime.timezone.utc) # Update configuration options, based on keywords passed to solve config: IpoptConfig = self.config(value=kwds, preserve_implicit=True) # Check if solver is available avail = self.available(config) if not avail: raise ApplicationError( f'Solver {self.__class__} is not available ({avail}).' ) if config.threads: logger.log( logging.WARNING, msg="The `threads` option was specified, " f"but this is not used by {self.__class__}.", ) if config.timer is None: timer = HierarchicalTimer() else: timer = config.timer StaleFlagManager.mark_all_as_stale() with TempfileManager.new_context() as tempfile: if config.working_dir is None: dname = tempfile.mkdtemp() else: dname = config.working_dir if not os.path.exists(dname): os.mkdir(dname) basename = os.path.join(dname, model.name) if os.path.exists(basename + '.nl'): raise RuntimeError( f"NL file with the same name {basename + '.nl'} already exists!" ) # Note: the ASL has an issue where string constants written # to the NL file (e.g. arguments in external functions) MUST # be terminated with '\n' regardless of platform. We will # disable universal newlines in the NL file to prevent # Python from mapping those '\n' to '\r\n' on Windows. with ( open(basename + '.nl', 'w', newline='\n', encoding='utf-8') as nl_file, open(basename + '.row', 'w', encoding='utf-8') as row_file, open(basename + '.col', 'w', encoding='utf-8') as col_file, ): timer.start('write_nl_file') self._writer.config.set_value(config.writer_config) try: nl_info = self._writer.write( model, nl_file, row_file, col_file, symbolic_solver_labels=config.symbolic_solver_labels, ) proven_infeasible = False except InfeasibleConstraintException: proven_infeasible = True timer.stop('write_nl_file') if not proven_infeasible and len(nl_info.variables) > 0: # Get a copy of the environment to pass to the subprocess env = os.environ.copy() if nl_info.external_function_libraries: env['AMPLFUNC'] = amplfunc_merge( env, *nl_info.external_function_libraries ) # Write the opt_file, if there should be one; return a bool to say # whether or not we have one (so we can correctly build the command line) opt_file = self._write_options_file( filename=basename, options=config.solver_options ) # Call ipopt - passing the files via the subprocess cmd = self._create_command_line( basename=basename, config=config, opt_file=opt_file ) # this seems silly, but we have to give the subprocess slightly # longer to finish than ipopt if config.time_limit is not None: timeout = config.time_limit + min( max(1.0, 0.01 * config.time_limit), 100 ) else: timeout = None ostreams = [io.StringIO()] + config.tee timer.start('subprocess') try: with TeeStream(*ostreams) as t: process = subprocess.run( cmd, timeout=timeout, env=env, universal_newlines=True, stdout=t.STDOUT, stderr=t.STDERR, check=False, ) except OSError: err = sys.exc_info()[1] msg = 'Could not execute the command: %s\tError message: %s' raise ApplicationError(msg % (cmd, err)) finally: timer.stop('subprocess') # This is the data we need to parse to get the iterations # and time parsed_output_data = self._parse_ipopt_output(ostreams[0]) if proven_infeasible: results = Results() results.termination_condition = TerminationCondition.provenInfeasible results.solution_loader = SolSolutionLoader(None, None) results.iteration_count = 0 results.timing_info.total_seconds = 0 elif len(nl_info.variables) == 0: if len(nl_info.eliminated_vars) == 0: results = Results() results.termination_condition = TerminationCondition.emptyModel results.solution_loader = SolSolutionLoader(None, None) else: results = Results() results.termination_condition = ( TerminationCondition.convergenceCriteriaSatisfied ) results.solution_status = SolutionStatus.optimal results.solution_loader = SolSolutionLoader(None, nl_info=nl_info) results.iteration_count = 0 results.timing_info.total_seconds = 0 else: if os.path.isfile(basename + '.sol'): with open(basename + '.sol', 'r', encoding='utf-8') as sol_file: timer.start('parse_sol') results = self._parse_solution(sol_file, nl_info) timer.stop('parse_sol') else: results = Results() if process.returncode != 0: results.extra_info.return_code = process.returncode results.termination_condition = TerminationCondition.error results.solution_loader = SolSolutionLoader(None, None) else: try: results.iteration_count = parsed_output_data.pop('iters') cpu_seconds = parsed_output_data.pop('cpu_seconds') for k, v in cpu_seconds.items(): results.timing_info[k] = v results.extra_info = parsed_output_data # Set iteration_log visibility to ADVANCED_OPTION because it's # a lot to print out with `display` results.extra_info.get("iteration_log")._visibility = ( ADVANCED_OPTION ) except KeyError as e: logger.log( logging.WARNING, "The solver output data is empty or incomplete.\n" f"Full error message: {e}\n" f"Parsed solver data: {parsed_output_data}\n", ) if ( config.raise_exception_on_nonoptimal_result and results.solution_status != SolutionStatus.optimal ): raise NoOptimalSolutionError() results.solver_name = self.name results.solver_version = self.version(config) if config.load_solutions: if results.solution_status == SolutionStatus.noSolution: raise NoFeasibleSolutionError() results.solution_loader.load_vars() if ( hasattr(model, 'dual') and isinstance(model.dual, Suffix) and model.dual.import_enabled() ): model.dual.update(results.solution_loader.get_duals()) if ( hasattr(model, 'rc') and isinstance(model.rc, Suffix) and model.rc.import_enabled() ): model.rc.update(results.solution_loader.get_reduced_costs()) if ( results.solution_status in {SolutionStatus.feasible, SolutionStatus.optimal} and len(nl_info.objectives) > 0 ): if config.load_solutions: results.incumbent_objective = value(nl_info.objectives[0]) else: results.incumbent_objective = value( replace_expressions( nl_info.objectives[0].expr, substitution_map={ id(v): val for v, val in results.solution_loader.get_primals().items() }, descend_into_named_expressions=True, remove_named_expressions=True, ) ) results.solver_config = config if not proven_infeasible and len(nl_info.variables) > 0: results.solver_log = ostreams[0].getvalue() # Capture/record end-time / wall-time end_timestamp = datetime.datetime.now(datetime.timezone.utc) results.timing_info.start_timestamp = start_timestamp results.timing_info.wall_time = ( end_timestamp - start_timestamp ).total_seconds() results.timing_info.timer = timer return results
def _parse_ipopt_output(self, output: Union[str, io.StringIO]) -> Dict[str, Any]: parsed_data = {} # Convert output to a string so we can parse it if isinstance(output, io.StringIO): output = output.getvalue() # Stop parsing if there is nothing to parse if not output: logger.log( logging.WARNING, "Returned output from ipopt was empty. Cannot parse for additional data.", ) return parsed_data # Extract number of iterations iter_match = re.search(r'Number of Iterations.*:\s+(\d+)', output) if iter_match: parsed_data['iters'] = int(iter_match.group(1)) # Gather all the iteration data iter_table = re.findall(r'^(?:\s*\d+.*?)$', output, re.MULTILINE) if iter_table: columns = [ "iter", "objective", "inf_pr", "inf_du", "lg_mu", "d_norm", "lg_rg", "alpha_du", "alpha_pr", "ls", ] iterations = [] for line in iter_table: tokens = line.strip().split() if len(tokens) != len(columns): continue iter_data = dict(zip(columns, tokens)) # Extract restoration flag from 'iter' iter_data['restoration'] = iter_data['iter'].endswith('r') if iter_data['restoration']: iter_data['iter'] = iter_data['iter'][:-1] # Separate alpha_pr into numeric part and optional tag iter_data['step_acceptance'] = iter_data['alpha_pr'][-1] if iter_data['step_acceptance'] in _ALPHA_PR_CHARS: iter_data['alpha_pr'] = iter_data['alpha_pr'][:-1] else: iter_data['step_acceptance'] = None # Attempt to cast all values to float where possible for key in columns: if iter_data[key] == '-': iter_data[key] = None else: try: iter_data[key] = float(iter_data[key]) except (ValueError, TypeError): logger.warning( "Error converting Ipopt log entry to " f"float:\n\t{sys.exc_info()[1]}\n\t{line}" ) assert len(iterations) == iter_data.pop('iter'), ( f"Parsed row in the iterations table\n\t{line}\ndoes not " f"match the next expected iteration number ({len(iterations)})" ) iterations.append(iter_data) parsed_data['iteration_log'] = iterations # Extract scaled and unscaled table scaled_unscaled_match = re.search( r''' Objective\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s* Dual\ infeasibility\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s* Constraint\ violation\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s* (?:Variable\ bound\ violation:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s*)? Complementarity\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s* Overall\ NLP\ error\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+) ''', output, re.DOTALL | re.VERBOSE, ) if scaled_unscaled_match: groups = scaled_unscaled_match.groups() all_fields = [ "incumbent_objective", "dual_infeasibility", "constraint_violation", "variable_bound_violation", # optional "complementarity_error", "overall_nlp_error", ] # Filter out None values and create final fields and values. # Nones occur in old-style IPOPT output (<= 3.13) zipped = [ (field, scaled, unscaled) for field, scaled, unscaled in zip( all_fields, groups[0::2], groups[1::2] ) if scaled is not None and unscaled is not None ] scaled = {k: float(s) for k, s, _ in zipped} unscaled = {k: float(u) for k, _, u in zipped} parsed_data.update(unscaled) parsed_data['final_scaled_results'] = scaled # Newer versions of IPOPT no longer separate timing into # two different values. This is so we have compatibility with # both new and old versions parsed_data['cpu_seconds'] = { k.strip(): float(v) for k, v in re.findall( r'Total(?: CPU)? sec(?:ond)?s in ([^=]+)=\s*([0-9.]+)', output ) } return parsed_data def _parse_solution( self, instream: io.TextIOBase, nl_info: NLWriterInfo ) -> Results: results = Results() res, sol_data = parse_sol_file( sol_file=instream, nl_info=nl_info, result=results ) if res.solution_status == SolutionStatus.noSolution: res.solution_loader = SolSolutionLoader(None, None) else: res.solution_loader = IpoptSolutionLoader( sol_data=sol_data, nl_info=nl_info ) return res