Source code for pyomo.contrib.appsi.solvers.cplex

from pyomo.common.tempfiles import TempfileManager
from pyomo.contrib.appsi.base import PersistentSolver, Results, TerminationCondition, MIPSolverConfig
from pyomo.contrib.appsi.writers import LPWriter
import logging
import math
from pyomo.common.collections import ComponentMap
from typing import Optional, Sequence, NoReturn, List, Mapping, Dict
from pyomo.core.base.var import _GeneralVarData
from pyomo.core.base.constraint import _GeneralConstraintData
from pyomo.core.base.block import _BlockData
from pyomo.core.base.param import _ParamData
from pyomo.core.base.objective import _GeneralObjectiveData
from pyomo.common.timing import HierarchicalTimer
import sys
import time
from pyomo.common.log import LogStream
from pyomo.common.config import ConfigValue, NonNegativeInt


logger = logging.getLogger(__name__)


[docs]class CplexConfig(MIPSolverConfig): def __init__(self): super(CplexConfig, self).__init__() self.declare('filename', ConfigValue(domain=str)) self.declare('keepfiles', ConfigValue(domain=bool)) self.declare('solver_output_logger', ConfigValue()) self.declare('log_level', ConfigValue(domain=NonNegativeInt)) self.filename = None self.keepfiles = False self.solver_output_logger = logger self.log_level = logging.INFO
[docs]class CplexResults(Results): def __init__(self): super(CplexResults, self).__init__() self.wallclock_time = None
[docs]class Cplex(PersistentSolver): def __init__(self): self._config = CplexConfig() self._solver_options = dict() self._writer = LPWriter() self._filename = None try: import cplex self._cplex = cplex self._cplex_model: Optional[cplex.Cplex] = None self._cplex_available = True except ImportError: self._cplex = None self._cplex_model = None self._cplex_available = False
[docs] def available(self, exception_flag=False): if exception_flag and not self._cplex_available: raise RuntimeError('Cplex is not available') return self._cplex_available
[docs] def version(self): return tuple(int(k) for k in self._cplex.Cplex().get_version().split('.'))
[docs] def lp_filename(self): if self._filename is None: return None else: return self._filename + '.lp'
[docs] def log_filename(self): if self._filename is None: return None else: return self._filename + '.log'
@property def config(self): return self._config @property def solver_options(self): return self._solver_options @property def update_config(self): return self._writer.update_config
[docs] def set_instance(self, model): self._writer.set_instance(model)
[docs] def add_variables(self, variables: List[_GeneralVarData]): self._writer.add_variables(variables)
[docs] def add_params(self, params: List[_ParamData]): self._writer.add_params(params)
[docs] def add_constraints(self, cons: List[_GeneralConstraintData]): self._writer.add_constraints(cons)
[docs] def add_block(self, block: _BlockData): self._writer.add_block(block)
[docs] def remove_variables(self, variables: List[_GeneralVarData]): self._writer.remove_variables(variables)
[docs] def remove_params(self, params: List[_ParamData]): self._writer.remove_params(params)
[docs] def remove_constraints(self, cons: List[_GeneralConstraintData]): self._writer.remove_constraints(cons)
[docs] def remove_block(self, block: _BlockData): self._writer.remove_block(block)
[docs] def set_objective(self, obj: _GeneralObjectiveData): self._writer.set_objective(obj)
[docs] def update_variables(self, variables: List[_GeneralVarData]): self._writer.update_variables(variables)
[docs] def update_params(self): self._writer.update_params()
[docs] def solve(self, model, timer: HierarchicalTimer = None): self.available(exception_flag=True) if timer is None: timer = HierarchicalTimer() try: TempfileManager.push() if self.config.filename is None: self._filename = TempfileManager.create_tempfile() else: self._filename = self.config.filename TempfileManager.add_tempfile(self._filename + '.lp', exists=False) TempfileManager.add_tempfile(self._filename + '.log', exists=False) timer.start('write lp file') self._writer.write(model, self._filename+'.lp', timer=timer) timer.stop('write lp file') res = self._apply_solver(timer) if self.config.report_timing: logger.info('\n' + str(timer)) return res finally: # finally, clean any temporary files registered with the # temp file manager, created/populated *directly* by this # plugin. TempfileManager.pop(remove=not self.config.keepfiles) if not self.config.keepfiles: self._filename = None
def _apply_solver(self, timer: HierarchicalTimer): config = self.config timer.start('cplex read lp') self._cplex_model = cplex_model = self._cplex.Cplex() cplex_model.read(self._filename + '.lp') timer.stop('cplex read lp') log_stream = LogStream(level=self.config.log_level, logger=self.config.solver_output_logger) if config.stream_solver: def _process_stream(arg): sys.stdout.write(arg) return arg cplex_model.set_results_stream(log_stream, _process_stream) else: cplex_model.set_results_stream(log_stream) for key, option in self.solver_options.items(): opt_cmd = cplex_model.parameters key_pieces = key.split('_') for key_piece in key_pieces: opt_cmd = getattr(opt_cmd, key_piece) opt_cmd.set(option) if config.time_limit is not None: cplex_model.parameters.timelimit.set(config.time_limit) if config.mip_gap is not None: cplex_model.parameters.mip.tolerances.mipgap.set(config.mip_gap) timer.start('cplex solve') t0 = time.time() cplex_model.solve() t1 = time.time() timer.stop('cplex solve') return self._postsolve(timer, t1-t0) def _postsolve(self, timer: HierarchicalTimer, solve_time): config = self.config cpxprob = self._cplex_model results = CplexResults() results.wallclock_time = solve_time status = cpxprob.solution.get_status() if status in [1, 101, 102]: results.termination_condition = TerminationCondition.optimal elif status in [2, 40, 118, 133, 134]: results.termination_condition = TerminationCondition.unbounded elif status in [4, 119, 134]: results.termination_condition = TerminationCondition.infeasibleOrUnbounded elif status in [3, 103]: results.termination_condition = TerminationCondition.infeasible elif status in [10]: results.termination_condition = TerminationCondition.maxIterations elif status in [11, 25, 107, 131]: results.termination_condition = TerminationCondition.maxTimeLimit else: results.termination_condition = TerminationCondition.unknown if self._writer.get_active_objective() is None: results.best_feasible_objective = None results.best_objective_bound = None else: if cpxprob.solution.get_solution_type() != cpxprob.solution.type.none: if (cpxprob.variables.get_num_binary() + cpxprob.variables.get_num_integer()) == 0: results.best_feasible_objective = cpxprob.solution.get_objective_value() results.best_objective_bound = cpxprob.solution.get_objective_value() else: results.best_feasible_objective = cpxprob.solution.get_objective_value() results.best_objective_bound = cpxprob.solution.MIP.get_best_objective() else: results.best_feasible_objective = None if cpxprob.objective.get_sense() == cpxprob.objective.sense.minimize: results.best_objective_bound = -math.inf else: results.best_objective_bound = math.inf if config.load_solution: if cpxprob.solution.get_solution_type() == cpxprob.solution.type.none: raise RuntimeError('A feasible solution was not found, so no solution can be loades. ' 'Please set opt.config.load_solution=False and check ' 'results.termination_condition and ' 'results.best_feasible_objective before loading a solution.') else: if results.termination_condition != TerminationCondition.optimal: logger.warning('Loading a feasible but suboptimal solution. ' 'Please set load_solution=False and check ' 'results.termination_condition before loading a solution.') timer.start('load solution') self.load_vars() timer.stop('load solution') return results
[docs] def load_vars(self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None) -> NoReturn: if self._cplex_model.solution.get_solution_type() == self._cplex_model.solution.type.none: raise RuntimeError('Cannot load variable values - no feasible solution was found.') symbol_map = self._writer.symbol_map if vars_to_load is None: var_names = self._cplex_model.variables.get_names() else: var_names = [symbol_map.byObject[id(v)] for v in vars_to_load] var_vals = self._cplex_model.solution.get_values(var_names) for name, val in zip(var_names, var_vals): if name == 'obj_const': continue v = symbol_map.bySymbol[name]() v.value = val
[docs] def get_duals(self, cons_to_load: Optional[Sequence[_GeneralConstraintData]] = None) -> Dict[_GeneralConstraintData, float]: if self._cplex_model.solution.get_solution_type() == self._cplex_model.solution.type.none: raise RuntimeError('Cannot get duals - no feasible solution was found.') if self._cplex_model.get_problem_type() in [self._cplex_model.problem_type.MILP, self._cplex_model.problem_type.MIQP, self._cplex_model.problem_type.MIQCP]: raise RuntimeError('Cannot get get duals for mixed-integer problems') symbol_map = self._writer.symbol_map if cons_to_load is None: con_names = self._cplex_model.linear_constraints.get_names() dual_values = self._cplex_model.solution.get_dual_values() else: con_names = list() for con in cons_to_load: orig_name = symbol_map.byObject[id(con)] if con.equality: con_names.append(orig_name + '_eq') else: if con.lower is not None: con_names.append(orig_name + '_lb') if con.upper is not None: con_names.append(orig_name + '_ub') dual_values = self._cplex_model.solution.get_dual_values(con_names) res = dict() for name, val in zip(con_names, dual_values): orig_name = name[:-3] if orig_name == 'obj_const_con': continue _con = symbol_map.bySymbol[orig_name]() if _con in res: if abs(val) > abs(res[_con]): res[_con] = val else: res[_con] = val return res
[docs] def get_reduced_costs(self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None) -> Mapping[_GeneralVarData, float]: if self._cplex_model.solution.get_solution_type() == self._cplex_model.solution.type.none: raise RuntimeError('Cannot get reduced costs - no feasible solution was found.') if self._cplex_model.get_problem_type() in [self._cplex_model.problem_type.MILP, self._cplex_model.problem_type.MIQP, self._cplex_model.problem_type.MIQCP]: raise RuntimeError('Cannot get get reduced costs for mixed-integer problems') symbol_map = self._writer.symbol_map if vars_to_load is None: var_names = self._cplex_model.variables.get_names() else: var_names = [symbol_map.byObject[id(v)] for v in vars_to_load] rc = self._cplex_model.solution.get_reduced_costs(var_names) res = ComponentMap() for name, val in zip(var_names, rc): if name == 'obj_const': continue v = symbol_map.bySymbol[name]() res[v] = val return res