Source code for pyomo.repn.plugins.lp_writer

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import logging
from io import StringIO
from operator import itemgetter, attrgetter

from pyomo.common.config import (
    ConfigBlock,
    ConfigValue,
    InEnum,
    document_kwargs_from_configdict,
)
from pyomo.common.gc_manager import PauseGC
from pyomo.common.timing import TicTocTimer

from pyomo.core.base import (
    Block,
    Objective,
    Constraint,
    Var,
    Param,
    Expression,
    SOSConstraint,
    SortComponents,
    Suffix,
    SymbolMap,
    minimize,
)
from pyomo.core.base.component import ActiveComponent
from pyomo.core.base.label import LPFileLabeler, NumericLabeler
from pyomo.opt import WriterFactory
from pyomo.repn.linear import LinearRepnVisitor
from pyomo.repn.quadratic import QuadraticRepnVisitor
from pyomo.repn.util import (
    FileDeterminism,
    FileDeterminism_to_SortComponents,
    OrderedVarRecorder,
    categorize_valid_components,
    initialize_var_map_from_column_order,
    int_float,
    ordered_active_constraints,
)

### FIXME: Remove the following as soon as non-active components no
### longer report active==True
from pyomo.core.base import Set, RangeSet, ExternalFunction
from pyomo.network import Port

logger = logging.getLogger(__name__)
inf = float('inf')
neg_inf = float('-inf')


# TODO: make a proper base class
[docs] class LPWriterInfo(object): """Return type for LPWriter.write() Attributes ---------- symbol_map: SymbolMap The :py:class:`SymbolMap` bimap between row/column labels and Pyomo components. """
[docs] def __init__(self, symbol_map): self.symbol_map = symbol_map
[docs] @WriterFactory.register( 'cpxlp_v2', 'Generate the corresponding CPLEX LP file (version 2).' ) @WriterFactory.register('lp_v2', 'Generate the corresponding LP file (version 2).') class LPWriter(object): CONFIG = ConfigBlock('lpwriter') CONFIG.declare( 'show_section_timing', ConfigValue( default=False, domain=bool, description='Print timing after writing each section of the LP file', ), ) CONFIG.declare( 'skip_trivial_constraints', ConfigValue( default=False, domain=bool, description='Skip writing constraints whose body is constant', ), ) CONFIG.declare( 'file_determinism', ConfigValue( default=FileDeterminism.ORDERED, domain=InEnum(FileDeterminism), description='How much effort to ensure file is deterministic', doc=""" How much effort do we want to put into ensuring the LP file is written deterministically for a Pyomo model: - NONE (0) : None - ORDERED (10): rely on underlying component ordering (default) - SORT_INDICES (20) : sort keys of indexed components - SORT_SYMBOLS (30) : sort keys AND sort names (not declaration order) """, ), ) CONFIG.declare( 'symbolic_solver_labels', ConfigValue( default=False, domain=bool, description='Write variables/constraints using model names', doc=""" Export variables and constraints to the LP file using human-readable text names derived from the corresponding Pyomo component names. """, ), ) CONFIG.declare( 'row_order', ConfigValue( default=None, description='Preferred constraint ordering', doc=""" List of constraints in the order that they should appear in the LP file. Unspecified constraints will appear at the end.""", ), ) CONFIG.declare( 'column_order', ConfigValue( default=None, description='Preferred variable ordering', doc=""" List of variables in the order that they should appear in the LP file. Note that this is only a suggestion, as the LP file format is row-major and the columns are inferred from the order in which variables appear in the objective followed by each constraint.""", ), ) CONFIG.declare( 'labeler', ConfigValue( default=None, description='Callable to use to generate symbol names in LP file', doc=""" Export variables and constraints to the LP file using human-readable text names derived from the corresponding Pyomo component names. """, ), ) CONFIG.declare( 'output_fixed_variable_bounds', ConfigValue( default=False, domain=bool, description='DEPRECATED option from LPv1 that has no effect in the LPv2', ), ) CONFIG.declare( 'allow_quadratic_objective', ConfigValue( default=True, domain=bool, description='If True, allow quadratic terms in the model objective', ), ) CONFIG.declare( 'allow_quadratic_constraint', ConfigValue( default=True, domain=bool, description='If True, allow quadratic terms in the model constraints', ), )
[docs] def __init__(self): self.config = self.CONFIG()
def __call__(self, model, filename, solver_capability, io_options): if filename is None: filename = model.name + ".lp" # Duplicate io_options to avoid side-effects io_options = dict(io_options) # Map old solver capabilities to new writer options qp = solver_capability('quadratic_objective') if 'allow_quadratic_objective' not in io_options: io_options['allow_quadratic_objective'] = qp qc = solver_capability('quadratic_constraint') if 'allow_quadratic_constraint' not in io_options: io_options['allow_quadratic_constraint'] = qc with open(filename, 'w', newline='') as FILE: info = self.write(model, FILE, **io_options) return filename, info.symbol_map
[docs] @document_kwargs_from_configdict(CONFIG) def write(self, model, ostream, **options): """Write a model in LP format. Returns ------- LPWriterInfo Parameters ---------- model: ConcreteModel The concrete Pyomo model to write out. ostream: io.TextIOBase The text output stream where the LP "file" will be written. Could be an opened file or a io.StringIO. """ config = self.config(options) if config.output_fixed_variable_bounds: deprecation_warning( "The 'output_fixed_variable_bounds' option to the LP " "writer is deprecated and is ignored by the lp_v2 writer." ) # Pause the GC, as the walker that generates the compiled LP # representation generates (and disposes of) a large number of # small objects. with PauseGC(): return _LPWriter_impl(ostream, config).write(model)
class _LPWriter_impl(object): def __init__(self, ostream, config): self.ostream = ostream self.config = config self.symbol_map = None def write(self, model): timing_logger = logging.getLogger('pyomo.common.timing.writer') timer = TicTocTimer(logger=timing_logger) with_debug_timing = ( timing_logger.isEnabledFor(logging.DEBUG) and timing_logger.hasHandlers() ) ostream = self.ostream labeler = self.config.labeler if labeler is None: if self.config.symbolic_solver_labels: labeler = LPFileLabeler() else: labeler = NumericLabeler('x') self.symbol_map = SymbolMap(labeler) addSymbol = self.symbol_map.addSymbol aliasSymbol = self.symbol_map.alias getSymbol = self.symbol_map.getSymbol self.sorter = sorter = FileDeterminism_to_SortComponents( self.config.file_determinism ) component_map, unknown = categorize_valid_components( model, active=True, sort=sorter, valid={ Block, Constraint, Var, Param, Expression, # FIXME: Non-active components should not report as Active ExternalFunction, Set, RangeSet, Port, # TODO: Piecewise, Complementarity }, targets={Suffix, SOSConstraint, Objective}, ) if unknown: raise ValueError( "The model ('%s') contains the following active components " "that the LP writer does not know how to process:\n\t%s" % ( model.name, "\n\t".join( "%s:\n\t\t%s" % (k, "\n\t\t".join(map(attrgetter('name'), v))) for k, v in unknown.items() ), ) ) ONE_VAR_CONSTANT = Var(name='ONE_VAR_CONSTANT', bounds=(1, 1)) ONE_VAR_CONSTANT.construct() self.var_map = {id(ONE_VAR_CONSTANT): ONE_VAR_CONSTANT} initialize_var_map_from_column_order(model, self.config, self.var_map) self.var_order = {_id: i for i, _id in enumerate(self.var_map)} self.var_recorder = OrderedVarRecorder(self.var_map, self.var_order, sorter) _qp = self.config.allow_quadratic_objective _qc = self.config.allow_quadratic_constraint objective_visitor = (QuadraticRepnVisitor if _qp else LinearRepnVisitor)( {}, var_recorder=self.var_recorder ) constraint_visitor = (QuadraticRepnVisitor if _qc else LinearRepnVisitor)( objective_visitor.subexpression_cache if _qp == _qc else {}, var_recorder=self.var_recorder, ) timer.toc('Initialized column order', level=logging.DEBUG) # We don't export any suffix information to the LP file # if component_map[Suffix]: suffixesByName = {} for block in component_map[Suffix]: for suffix in block.component_objects( Suffix, active=True, descend_into=False, sort=sorter ): if not suffix.export_enabled() or not suffix: continue name = suffix.local_name if name in suffixesByName: suffixesByName[name].append(suffix) else: suffixesByName[name] = [suffix] for name, suffixes in suffixesByName.items(): n = len(suffixes) plural = 's' if n > 1 else '' logger.warning( f"EXPORT Suffix '{name}' found on {n} block{plural}:\n " + "\n ".join(s.name for s in suffixes) + "\nLP writer cannot export suffixes to LP files. Skipping." ) ostream.write(f"\\* Source Pyomo model name={model.name} *\\\n\n") # # Process objective # if not component_map[Objective]: objectives = [Objective(expr=1)] objectives[0].construct() else: objectives = [] for blk in component_map[Objective]: objectives.extend( blk.component_data_objects( Objective, active=True, descend_into=False, sort=sorter ) ) if len(objectives) > 1: raise ValueError( "More than one active objective defined for input model '%s'; " "Cannot write legal LP file\nObjectives: %s" % (model.name, ' '.join(obj.name for obj in objectives)) ) obj = objectives[0] ostream.write( ("min \n%s:\n" if obj.sense == minimize else "max \n%s:\n") % (getSymbol(obj, labeler),) ) repn = objective_visitor.walk_expression(obj.expr) if repn.nonlinear is not None: raise ValueError( f"Model objective ({obj.name}) contains nonlinear terms that " "cannot be written to LP format" ) if repn.constant or not (repn.linear or getattr(repn, 'quadratic', None)): # Older versions of CPLEX (including 12.6) and all versions # of GLPK (through 5.0) do not support constants in the # objective in LP format. To avoid painful bookkeeping, we # introduce the following "variable", constrained to the # value 1. # # In addition, most solvers do no tolerate an empty # objective, this will ensure we at least write out # 0*ONE_VAR_CONSTANT. repn.linear[id(ONE_VAR_CONSTANT)] = repn.constant repn.constant = 0 self.write_expression(ostream, repn, True) aliasSymbol(obj, '__default_objective__') if with_debug_timing: timer.toc('Objective %s', obj, level=logging.DEBUG) ostream.write("\ns.t.\n") # # Tabulate constraints # skip_trivial_constraints = self.config.skip_trivial_constraints have_nontrivial = False last_parent = None for con in ordered_active_constraints(model, self.config): if with_debug_timing and con.parent_component() is not last_parent: timer.toc('Constraint %s', last_parent, level=logging.DEBUG) last_parent = con.parent_component() # Note: Constraint.to_bounded_expression(evaluate_bounds=True) # guarantee a return value that is either a (finite) # native_numeric_type, or None lb, body, ub = con.to_bounded_expression(True) if lb is None and ub is None: # Note: you *cannot* output trivial (unbounded) # constraints in LP format. I suppose we could add a # slack variable if skip_trivial_constraints is False, # but that seems rather silly. continue repn = constraint_visitor.walk_expression(body) if repn.nonlinear is not None: raise ValueError( f"Model constraint ({con.name}) contains nonlinear terms that " "cannot be written to LP format" ) # Pull out the constant: we will move it to the bounds offset = repn.constant repn.constant = 0 if repn.linear or getattr(repn, 'quadratic', None): have_nontrivial = True else: if ( skip_trivial_constraints and (lb is None or lb <= offset) and (ub is None or ub >= offset) ): continue # This is a trivially infeasible model. We could raise # an exception, or we could allow the solver to return # infeasible. There are fewer logic paths (in # particular related to mapping solver result status) if # we just defer to the solver. # # Add a dummy (fixed) variable to the constraint, # because some solvers (including versions of GLPK) # cannot parse an LP file without a variable on the left # hand side. repn.linear[id(ONE_VAR_CONSTANT)] = 0 symbol = labeler(con) if lb is not None: if ub is None: label = f'c_l_{symbol}_' addSymbol(con, label) ostream.write(f'\n{label}:\n') self.write_expression(ostream, repn, False) ostream.write(f'>= {(lb - offset)!s}\n') elif lb == ub: label = f'c_e_{symbol}_' addSymbol(con, label) ostream.write(f'\n{label}:\n') self.write_expression(ostream, repn, False) ostream.write(f'= {(lb - offset)!s}\n') else: # We will need the constraint body twice. Generate # in a buffer so we only have to do that once. buf = StringIO() self.write_expression(buf, repn, False) buf = buf.getvalue() # label = f'r_l_{symbol}_' addSymbol(con, label) ostream.write(f'\n{label}:\n') ostream.write(buf) ostream.write(f'>= {(lb - offset)!s}\n') label = f'r_u_{symbol}_' aliasSymbol(con, label) ostream.write(f'\n{label}:\n') ostream.write(buf) ostream.write(f'<= {(ub - offset)!s}\n') elif ub is not None: label = f'c_u_{symbol}_' addSymbol(con, label) ostream.write(f'\n{label}:\n') self.write_expression(ostream, repn, False) ostream.write(f'<= {(ub - offset)!s}\n') if with_debug_timing: # report the last constraint timer.toc('Constraint %s', last_parent, level=logging.DEBUG) if not have_nontrivial: # Some solvers (notably CBC through at least 2.10.4) will # return a nonzero return code when the model has no # constraints. To work around the original Pyomo solver # hierarchy (where the return code was processed in the base # class), we will add a dummy constraint here. repn = constraint_visitor.Result() # walk_expression(ONE_VAR_CONSTANT) repn.linear[id(ONE_VAR_CONSTANT)] = 1 ostream.write(f'\nc_e_ONE_VAR_CONSTANT:\n') self.write_expression(ostream, repn, False) ostream.write(f'= 1\n') ostream.write("\nbounds") # Track the number of integer and binary variables, so you can # output their status later. integer_vars = [] binary_vars = [] getSymbolByObjectID = self.symbol_map.byObject.get for vid, v in self.var_map.items(): # Some variables in the var_map may not actually have been # written out to the LP file (e.g., added from col_order, or # multiplied by 0 in the expressions). Check to see that # the variable is in the symbol_map before outputting. v_symbol = getSymbolByObjectID(vid, None) if not v_symbol: continue if v.is_binary(): binary_vars.append(v_symbol) elif v.is_integer(): integer_vars.append(v_symbol) # Note: Var.bounds guarantees the values are either (finite) # native_numeric_types or None lb, ub = v.bounds lb = '-inf' if lb is None else str(lb) ub = '+inf' if ub is None else str(ub) ostream.write(f"\n {lb} <= {v_symbol} <= {ub}") if integer_vars: ostream.write("\ngeneral\n ") ostream.write("\n ".join(integer_vars)) if binary_vars: ostream.write("\nbinary\n ") ostream.write("\n ".join(binary_vars)) timer.toc("Wrote variable bounds and domains", level=logging.DEBUG) # # Tabulate SOS constraints # if component_map[SOSConstraint]: sos = [] for blk in component_map[SOSConstraint]: sos.extend( blk.component_data_objects( SOSConstraint, active=True, descend_into=False, sort=sorter ) ) if self.config.row_order: # sort() is stable (per Python docs), so we can let # all unspecified rows have a row number one bigger than # the number of rows specified by the user ordering. _n = len(row_order) sos.sort(key=lambda x: _row_getter(x, _n)) ostream.write("\nSOS\n") for soscon in sos: ostream.write(f'\n{getSymbol(soscon)}: S{soscon.level}::\n') for v, w in getattr(soscon, 'get_items', soscon.items)(): if w.__class__ not in int_float: w = float(f) ostream.write(f" {getSymbol(v)}:{w!s}\n") ostream.write("\nend\n") info = LPWriterInfo(self.symbol_map) timer.toc("Generated LP representation", delta=False) return info def write_expression(self, ostream, expr, is_objective): assert not expr.constant getSymbol = self.symbol_map.getSymbol getVarOrder = self.var_order.__getitem__ getVar = self.var_map.__getitem__ if expr.linear: for vid, coef in sorted( expr.linear.items(), key=lambda x: getVarOrder(x[0]) ): if coef < 0: ostream.write(f'{coef!s} {getSymbol(getVar(vid))}\n') else: ostream.write(f'+{coef!s} {getSymbol(getVar(vid))}\n') quadratic = getattr(expr, 'quadratic', None) if quadratic: def _normalize_constraint(data): (vid1, vid2), coef = data c1 = getVarOrder(vid1) c2 = getVarOrder(vid2) if c2 < c1: col = c2, c1 sym = f' {getSymbol(getVar(vid2))} * {getSymbol(getVar(vid1))}\n' elif c1 == c2: col = c1, c1 sym = f' {getSymbol(getVar(vid2))} ^ 2\n' else: col = c1, c2 sym = f' {getSymbol(getVar(vid1))} * {getSymbol(getVar(vid2))}\n' if coef < 0: return col, str(coef) + sym else: return col, f'+{coef!s}{sym}' if is_objective: # # Times 2 because LP format requires /2 for all the # quadratic terms /of the objective only/. Discovered # the last bit through trial and error. # Ref: ILog CPlex 8.0 User's Manual, p197. # def _normalize_objective(data): vids, coef = data return _normalize_constraint((vids, 2 * coef)) _normalize = _normalize_objective else: _normalize = _normalize_constraint ostream.write('+ [\n') quadratic = sorted(map(_normalize, quadratic.items()), key=itemgetter(0)) ostream.write(''.join(map(itemgetter(1), quadratic))) if is_objective: ostream.write("] / 2\n") else: ostream.write("]\n")