# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
from pyomo.common.tempfiles import TempfileManager
from pyomo.contrib.appsi.base import (
PersistentSolver,
Results,
TerminationCondition,
MIPSolverConfig,
PersistentSolutionLoader,
)
from pyomo.contrib.appsi.writers import LPWriter
import logging
import math
from pyomo.common.collections import ComponentMap
from typing import Optional, Sequence, NoReturn, List, Mapping, Dict
from pyomo.core.base.var import _GeneralVarData
from pyomo.core.base.constraint import _GeneralConstraintData
from pyomo.core.base.block import _BlockData
from pyomo.core.base.param import _ParamData
from pyomo.core.base.objective import _GeneralObjectiveData
from pyomo.common.timing import HierarchicalTimer
import sys
import time
from pyomo.common.log import LogStream
from pyomo.common.config import ConfigValue, NonNegativeInt
from pyomo.common.errors import PyomoException
from pyomo.contrib.appsi.cmodel import cmodel_available
from pyomo.core.staleflag import StaleFlagManager
logger = logging.getLogger(__name__)
[docs]class CplexConfig(MIPSolverConfig):
def __init__(
self,
description=None,
doc=None,
implicit=False,
implicit_domain=None,
visibility=0,
):
super(CplexConfig, self).__init__(
description=description,
doc=doc,
implicit=implicit,
implicit_domain=implicit_domain,
visibility=visibility,
)
self.declare('filename', ConfigValue(domain=str))
self.declare('keepfiles', ConfigValue(domain=bool))
self.declare('solver_output_logger', ConfigValue())
self.declare('log_level', ConfigValue(domain=NonNegativeInt))
self.filename = None
self.keepfiles = False
self.solver_output_logger = logger
self.log_level = logging.INFO
[docs]class CplexResults(Results):
def __init__(self, solver):
super(CplexResults, self).__init__()
self.wallclock_time = None
self.solution_loader = PersistentSolutionLoader(solver=solver)
[docs]class Cplex(PersistentSolver):
_available = None
def __init__(self, only_child_vars=False):
self._config = CplexConfig()
self._solver_options = dict()
self._writer = LPWriter(only_child_vars=only_child_vars)
self._filename = None
self._last_results_object: Optional[CplexResults] = None
try:
import cplex
self._cplex = cplex
self._cplex_model: Optional[cplex.Cplex] = None
self._cplex_available = True
except ImportError:
self._cplex = None
self._cplex_model = None
self._cplex_available = False
@property
def writer(self):
return self._writer
@property
def symbol_map(self):
return self._writer.symbol_map
[docs] def available(self):
if Cplex._available is None:
self._check_license()
return Cplex._available
def _check_license(self):
if self._cplex_available:
if not cmodel_available:
Cplex._available = self.Availability.NeedsCompiledExtension
else:
try:
m = self._cplex.Cplex()
m.set_results_stream(None)
m.variables.add(lb=[0] * 1001)
m.solve()
Cplex._available = self.Availability.FullLicense
except self._cplex.exceptions.errors.CplexSolverError:
try:
m = self._cplex.Cplex()
m.set_results_stream(None)
m.variables.add(lb=[0])
m.solve()
Cplex._available = self.Availability.LimitedLicense
except:
Cplex._available = self.Availability.BadLicense
else:
Cplex._available = self.Availability.NotFound
[docs] def version(self):
return tuple(int(k) for k in self._cplex.Cplex().get_version().split('.'))
[docs] def lp_filename(self):
if self._filename is None:
return None
else:
return self._filename + '.lp'
[docs] def log_filename(self):
if self._filename is None:
return None
else:
return self._filename + '.log'
@property
def config(self):
return self._config
@config.setter
def config(self, val):
self._config = val
@property
def cplex_options(self):
"""
A dictionary mapping solver options to values for those options. These
are solver specific.
Returns
-------
dict
A dictionary mapping solver options to values for those options
"""
return self._solver_options
@cplex_options.setter
def cplex_options(self, val: Dict):
self._solver_options = val
@property
def update_config(self):
return self._writer.update_config
[docs] def set_instance(self, model):
self._writer.set_instance(model)
[docs] def add_variables(self, variables: List[_GeneralVarData]):
self._writer.add_variables(variables)
[docs] def add_params(self, params: List[_ParamData]):
self._writer.add_params(params)
[docs] def add_constraints(self, cons: List[_GeneralConstraintData]):
self._writer.add_constraints(cons)
[docs] def add_block(self, block: _BlockData):
self._writer.add_block(block)
[docs] def remove_variables(self, variables: List[_GeneralVarData]):
self._writer.remove_variables(variables)
[docs] def remove_params(self, params: List[_ParamData]):
self._writer.remove_params(params)
[docs] def remove_constraints(self, cons: List[_GeneralConstraintData]):
self._writer.remove_constraints(cons)
[docs] def remove_block(self, block: _BlockData):
self._writer.remove_block(block)
[docs] def set_objective(self, obj: _GeneralObjectiveData):
self._writer.set_objective(obj)
[docs] def update_variables(self, variables: List[_GeneralVarData]):
self._writer.update_variables(variables)
[docs] def update_params(self):
self._writer.update_params()
[docs] def solve(self, model, timer: HierarchicalTimer = None):
StaleFlagManager.mark_all_as_stale()
avail = self.available()
if not avail:
raise PyomoException(f'Solver {self.__class__} is not available ({avail}).')
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
if timer is None:
timer = HierarchicalTimer()
try:
TempfileManager.push()
if self.config.filename is None:
self._filename = TempfileManager.create_tempfile()
else:
self._filename = self.config.filename
TempfileManager.add_tempfile(self._filename + '.lp', exists=False)
TempfileManager.add_tempfile(self._filename + '.log', exists=False)
timer.start('write lp file')
self._writer.write(model, self._filename + '.lp', timer=timer)
timer.stop('write lp file')
res = self._apply_solver(timer)
self._last_results_object = res
if self.config.report_timing:
logger.info('\n' + str(timer))
return res
finally:
# finally, clean any temporary files registered with the
# temp file manager, created/populated *directly* by this
# plugin.
TempfileManager.pop(remove=not self.config.keepfiles)
if not self.config.keepfiles:
self._filename = None
def _apply_solver(self, timer: HierarchicalTimer):
config = self.config
timer.start('cplex read lp')
self._cplex_model = cplex_model = self._cplex.Cplex()
cplex_model.read(self._filename + '.lp')
timer.stop('cplex read lp')
log_stream = LogStream(
level=self.config.log_level, logger=self.config.solver_output_logger
)
if config.stream_solver:
def _process_stream(arg):
sys.stdout.write(arg)
return arg
cplex_model.set_results_stream(log_stream, _process_stream)
else:
cplex_model.set_results_stream(log_stream)
for key, option in self.cplex_options.items():
opt_cmd = cplex_model.parameters
key_pieces = key.split('_')
for key_piece in key_pieces:
opt_cmd = getattr(opt_cmd, key_piece)
opt_cmd.set(option)
if config.time_limit is not None:
cplex_model.parameters.timelimit.set(config.time_limit)
if config.mip_gap is not None:
cplex_model.parameters.mip.tolerances.mipgap.set(config.mip_gap)
timer.start('cplex solve')
t0 = time.time()
cplex_model.solve()
t1 = time.time()
timer.stop('cplex solve')
return self._postsolve(timer, t1 - t0)
def _postsolve(self, timer: HierarchicalTimer, solve_time):
config = self.config
cpxprob = self._cplex_model
results = CplexResults(solver=self)
results.wallclock_time = solve_time
status = cpxprob.solution.get_status()
if status in [1, 101, 102]:
results.termination_condition = TerminationCondition.optimal
elif status in [2, 40, 118, 133, 134]:
results.termination_condition = TerminationCondition.unbounded
elif status in [4, 119, 134]:
results.termination_condition = TerminationCondition.infeasibleOrUnbounded
elif status in [3, 103]:
results.termination_condition = TerminationCondition.infeasible
elif status in [10]:
results.termination_condition = TerminationCondition.maxIterations
elif status in [11, 25, 107, 131]:
results.termination_condition = TerminationCondition.maxTimeLimit
else:
results.termination_condition = TerminationCondition.unknown
if self._writer.get_active_objective() is None:
results.best_feasible_objective = None
results.best_objective_bound = None
else:
if cpxprob.solution.get_solution_type() != cpxprob.solution.type.none:
if (
cpxprob.variables.get_num_binary()
+ cpxprob.variables.get_num_integer()
) == 0:
results.best_feasible_objective = (
cpxprob.solution.get_objective_value()
)
results.best_objective_bound = (
cpxprob.solution.get_objective_value()
)
else:
results.best_feasible_objective = (
cpxprob.solution.get_objective_value()
)
results.best_objective_bound = (
cpxprob.solution.MIP.get_best_objective()
)
else:
results.best_feasible_objective = None
if cpxprob.objective.get_sense() == cpxprob.objective.sense.minimize:
results.best_objective_bound = -math.inf
else:
results.best_objective_bound = math.inf
if config.load_solution:
if cpxprob.solution.get_solution_type() == cpxprob.solution.type.none:
raise RuntimeError(
'A feasible solution was not found, so no solution can be loaded. '
'If using the appsi.solvers.Cplex interface, you can '
'set opt.config.load_solution=False. If using the environ.SolverFactory '
'interface, you can set opt.solve(model, load_solutions = False). '
'Then you can check results.termination_condition and '
'results.best_feasible_objective before loading a solution.'
)
else:
if results.termination_condition != TerminationCondition.optimal:
logger.warning(
'Loading a feasible but suboptimal solution. '
'Please set load_solution=False and check '
'results.termination_condition before loading a solution.'
)
timer.start('load solution')
self.load_vars()
timer.stop('load solution')
return results
[docs] def get_primals(
self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None
) -> Mapping[_GeneralVarData, float]:
if (
self._cplex_model.solution.get_solution_type()
== self._cplex_model.solution.type.none
):
raise RuntimeError(
'Solver does not currently have a valid solution. Please '
'check the termination condition.'
)
symbol_map = self._writer.symbol_map
if vars_to_load is None:
var_names = self._cplex_model.variables.get_names()
else:
var_names = [symbol_map.byObject[id(v)] for v in vars_to_load]
var_vals = self._cplex_model.solution.get_values(var_names)
res = ComponentMap()
for name, val in zip(var_names, var_vals):
if name == 'obj_const':
continue
v = symbol_map.bySymbol[name]
if self._writer._referenced_variables[id(v)]:
res[v] = val
return res
[docs] def get_duals(
self, cons_to_load: Optional[Sequence[_GeneralConstraintData]] = None
) -> Dict[_GeneralConstraintData, float]:
if (
self._cplex_model.solution.get_solution_type()
== self._cplex_model.solution.type.none
):
raise RuntimeError(
'Solver does not currently have valid duals. Please '
'check the termination condition.'
)
if self._cplex_model.get_problem_type() in [
self._cplex_model.problem_type.MILP,
self._cplex_model.problem_type.MIQP,
self._cplex_model.problem_type.MIQCP,
]:
raise RuntimeError('Cannot get duals for mixed-integer problems')
symbol_map = self._writer.symbol_map
if cons_to_load is None:
con_names = self._cplex_model.linear_constraints.get_names()
dual_values = self._cplex_model.solution.get_dual_values()
else:
con_names = list()
for con in cons_to_load:
orig_name = symbol_map.byObject[id(con)]
if con.equality:
con_names.append(orig_name + '_eq')
else:
if con.lower is not None:
con_names.append(orig_name + '_lb')
if con.upper is not None:
con_names.append(orig_name + '_ub')
dual_values = self._cplex_model.solution.get_dual_values(con_names)
res = dict()
for name, val in zip(con_names, dual_values):
orig_name = name[:-3]
if orig_name == 'obj_const_con':
continue
_con = symbol_map.bySymbol[orig_name]
if _con in res:
if abs(val) > abs(res[_con]):
res[_con] = val
else:
res[_con] = val
return res
[docs] def get_reduced_costs(
self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None
) -> Mapping[_GeneralVarData, float]:
if (
self._cplex_model.solution.get_solution_type()
== self._cplex_model.solution.type.none
):
raise RuntimeError(
'Solver does not currently have valid reduced costs. Please '
'check the termination condition.'
)
if self._cplex_model.get_problem_type() in [
self._cplex_model.problem_type.MILP,
self._cplex_model.problem_type.MIQP,
self._cplex_model.problem_type.MIQCP,
]:
raise RuntimeError('Cannot get reduced costs for mixed-integer problems')
symbol_map = self._writer.symbol_map
if vars_to_load is None:
var_names = self._cplex_model.variables.get_names()
else:
var_names = [symbol_map.byObject[id(v)] for v in vars_to_load]
rc = self._cplex_model.solution.get_reduced_costs(var_names)
res = ComponentMap()
for name, val in zip(var_names, rc):
if name == 'obj_const':
continue
v = symbol_map.bySymbol[name]
res[v] = val
return res