Source code for pyomo.contrib.appsi.solvers.cplex

from pyomo.common.tempfiles import TempfileManager
from pyomo.contrib.appsi.base import PersistentSolver, Results, TerminationCondition, MIPSolverConfig, PersistentSolutionLoader
from pyomo.contrib.appsi.writers import LPWriter
import logging
import math
from pyomo.common.collections import ComponentMap
from typing import Optional, Sequence, NoReturn, List, Mapping, Dict
from pyomo.core.base.var import _GeneralVarData
from pyomo.core.base.constraint import _GeneralConstraintData
from pyomo.core.base.block import _BlockData
from pyomo.core.base.param import _ParamData
from pyomo.core.base.objective import _GeneralObjectiveData
from pyomo.common.timing import HierarchicalTimer
import sys
import time
from pyomo.common.log import LogStream
from pyomo.common.config import ConfigValue, NonNegativeInt
from pyomo.common.errors import PyomoException


logger = logging.getLogger(__name__)


[docs]class CplexConfig(MIPSolverConfig): def __init__(self, description=None, doc=None, implicit=False, implicit_domain=None, visibility=0): super(CplexConfig, self).__init__(description=description, doc=doc, implicit=implicit, implicit_domain=implicit_domain, visibility=visibility) self.declare('filename', ConfigValue(domain=str)) self.declare('keepfiles', ConfigValue(domain=bool)) self.declare('solver_output_logger', ConfigValue()) self.declare('log_level', ConfigValue(domain=NonNegativeInt)) self.filename = None self.keepfiles = False self.solver_output_logger = logger self.log_level = logging.INFO
[docs]class CplexResults(Results): def __init__(self, solver): super(CplexResults, self).__init__() self.wallclock_time = None self.solution_loader = PersistentSolutionLoader(solver=solver)
[docs]class Cplex(PersistentSolver): _available = None def __init__(self): self._config = CplexConfig() self._solver_options = dict() self._writer = LPWriter() self._filename = None self._last_results_object: Optional[CplexResults] = None try: import cplex self._cplex = cplex self._cplex_model: Optional[cplex.Cplex] = None self._cplex_available = True except ImportError: self._cplex = None self._cplex_model = None self._cplex_available = False @property def writer(self): return self._writer @property def symbol_map(self): return self._writer.symbol_map
[docs] def available(self): if Cplex._available is None: self._check_license() return Cplex._available
def _check_license(self): if self._cplex_available: try: m = self._cplex.Cplex() m.variables.add(lb=[0]*1001) m.solve() Cplex._available = self.Availability.FullLicense except self._cplex.exceptions.errors.CplexSolverError: try: m = self._cplex.Cplex() m.variables.add(lb=[0]) m.solve() Cplex._available = self.Availability.LimitedLicense except: Cplex._available = self.Availability.BadLicense else: Cplex._available = self.Availability.NotFound
[docs] def version(self): return tuple(int(k) for k in self._cplex.Cplex().get_version().split('.'))
[docs] def lp_filename(self): if self._filename is None: return None else: return self._filename + '.lp'
[docs] def log_filename(self): if self._filename is None: return None else: return self._filename + '.log'
@property def config(self): return self._config @config.setter def config(self, val): self._config = val @property def cplex_options(self): """ Returns ------- cplex_options: dict A dictionary mapping solver options to values for those options. These are solver specific. """ return self._solver_options @cplex_options.setter def cplex_options(self, val: Dict): self._solver_options = val @property def update_config(self): return self._writer.update_config
[docs] def set_instance(self, model): self._writer.set_instance(model)
[docs] def add_variables(self, variables: List[_GeneralVarData]): self._writer.add_variables(variables)
[docs] def add_params(self, params: List[_ParamData]): self._writer.add_params(params)
[docs] def add_constraints(self, cons: List[_GeneralConstraintData]): self._writer.add_constraints(cons)
[docs] def add_block(self, block: _BlockData): self._writer.add_block(block)
[docs] def remove_variables(self, variables: List[_GeneralVarData]): self._writer.remove_variables(variables)
[docs] def remove_params(self, params: List[_ParamData]): self._writer.remove_params(params)
[docs] def remove_constraints(self, cons: List[_GeneralConstraintData]): self._writer.remove_constraints(cons)
[docs] def remove_block(self, block: _BlockData): self._writer.remove_block(block)
[docs] def set_objective(self, obj: _GeneralObjectiveData): self._writer.set_objective(obj)
[docs] def update_variables(self, variables: List[_GeneralVarData]): self._writer.update_variables(variables)
[docs] def update_params(self): self._writer.update_params()
[docs] def solve(self, model, timer: HierarchicalTimer = None): avail = self.available() if not avail: raise PyomoException(f'Solver {self.__class__} is not available ({avail}).') if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() if timer is None: timer = HierarchicalTimer() try: TempfileManager.push() if self.config.filename is None: self._filename = TempfileManager.create_tempfile() else: self._filename = self.config.filename TempfileManager.add_tempfile(self._filename + '.lp', exists=False) TempfileManager.add_tempfile(self._filename + '.log', exists=False) timer.start('write lp file') self._writer.write(model, self._filename+'.lp', timer=timer) timer.stop('write lp file') res = self._apply_solver(timer) self._last_results_object = res if self.config.report_timing: logger.info('\n' + str(timer)) return res finally: # finally, clean any temporary files registered with the # temp file manager, created/populated *directly* by this # plugin. TempfileManager.pop(remove=not self.config.keepfiles) if not self.config.keepfiles: self._filename = None
def _apply_solver(self, timer: HierarchicalTimer): config = self.config timer.start('cplex read lp') self._cplex_model = cplex_model = self._cplex.Cplex() cplex_model.read(self._filename + '.lp') timer.stop('cplex read lp') log_stream = LogStream(level=self.config.log_level, logger=self.config.solver_output_logger) if config.stream_solver: def _process_stream(arg): sys.stdout.write(arg) return arg cplex_model.set_results_stream(log_stream, _process_stream) else: cplex_model.set_results_stream(log_stream) for key, option in self.cplex_options.items(): opt_cmd = cplex_model.parameters key_pieces = key.split('_') for key_piece in key_pieces: opt_cmd = getattr(opt_cmd, key_piece) opt_cmd.set(option) if config.time_limit is not None: cplex_model.parameters.timelimit.set(config.time_limit) if config.mip_gap is not None: cplex_model.parameters.mip.tolerances.mipgap.set(config.mip_gap) timer.start('cplex solve') t0 = time.time() cplex_model.solve() t1 = time.time() timer.stop('cplex solve') return self._postsolve(timer, t1-t0) def _postsolve(self, timer: HierarchicalTimer, solve_time): config = self.config cpxprob = self._cplex_model results = CplexResults(solver=self) results.wallclock_time = solve_time status = cpxprob.solution.get_status() if status in [1, 101, 102]: results.termination_condition = TerminationCondition.optimal elif status in [2, 40, 118, 133, 134]: results.termination_condition = TerminationCondition.unbounded elif status in [4, 119, 134]: results.termination_condition = TerminationCondition.infeasibleOrUnbounded elif status in [3, 103]: results.termination_condition = TerminationCondition.infeasible elif status in [10]: results.termination_condition = TerminationCondition.maxIterations elif status in [11, 25, 107, 131]: results.termination_condition = TerminationCondition.maxTimeLimit else: results.termination_condition = TerminationCondition.unknown if self._writer.get_active_objective() is None: results.best_feasible_objective = None results.best_objective_bound = None else: if cpxprob.solution.get_solution_type() != cpxprob.solution.type.none: if (cpxprob.variables.get_num_binary() + cpxprob.variables.get_num_integer()) == 0: results.best_feasible_objective = cpxprob.solution.get_objective_value() results.best_objective_bound = cpxprob.solution.get_objective_value() else: results.best_feasible_objective = cpxprob.solution.get_objective_value() results.best_objective_bound = cpxprob.solution.MIP.get_best_objective() else: results.best_feasible_objective = None if cpxprob.objective.get_sense() == cpxprob.objective.sense.minimize: results.best_objective_bound = -math.inf else: results.best_objective_bound = math.inf if config.load_solution: if cpxprob.solution.get_solution_type() == cpxprob.solution.type.none: raise RuntimeError('A feasible solution was not found, so no solution can be loades. ' 'Please set opt.config.load_solution=False and check ' 'results.termination_condition and ' 'results.best_feasible_objective before loading a solution.') else: if results.termination_condition != TerminationCondition.optimal: logger.warning('Loading a feasible but suboptimal solution. ' 'Please set load_solution=False and check ' 'results.termination_condition before loading a solution.') timer.start('load solution') self.load_vars() timer.stop('load solution') return results
[docs] def get_primals(self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None) -> Mapping[_GeneralVarData, float]: if self._cplex_model.solution.get_solution_type() == self._cplex_model.solution.type.none: raise RuntimeError('Cannot load variable values - no feasible solution was found.') symbol_map = self._writer.symbol_map if vars_to_load is None: var_names = self._cplex_model.variables.get_names() else: var_names = [symbol_map.byObject[id(v)] for v in vars_to_load] var_vals = self._cplex_model.solution.get_values(var_names) res = ComponentMap() for name, val in zip(var_names, var_vals): if name == 'obj_const': continue v = symbol_map.bySymbol[name]() res[v] = val return res
[docs] def get_duals(self, cons_to_load: Optional[Sequence[_GeneralConstraintData]] = None) -> Dict[_GeneralConstraintData, float]: if self._cplex_model.solution.get_solution_type() == self._cplex_model.solution.type.none: raise RuntimeError('Cannot get duals - no feasible solution was found.') if self._cplex_model.get_problem_type() in [self._cplex_model.problem_type.MILP, self._cplex_model.problem_type.MIQP, self._cplex_model.problem_type.MIQCP]: raise RuntimeError('Cannot get get duals for mixed-integer problems') symbol_map = self._writer.symbol_map if cons_to_load is None: con_names = self._cplex_model.linear_constraints.get_names() dual_values = self._cplex_model.solution.get_dual_values() else: con_names = list() for con in cons_to_load: orig_name = symbol_map.byObject[id(con)] if con.equality: con_names.append(orig_name + '_eq') else: if con.lower is not None: con_names.append(orig_name + '_lb') if con.upper is not None: con_names.append(orig_name + '_ub') dual_values = self._cplex_model.solution.get_dual_values(con_names) res = dict() for name, val in zip(con_names, dual_values): orig_name = name[:-3] if orig_name == 'obj_const_con': continue _con = symbol_map.bySymbol[orig_name]() if _con in res: if abs(val) > abs(res[_con]): res[_con] = val else: res[_con] = val return res
[docs] def get_reduced_costs(self, vars_to_load: Optional[Sequence[_GeneralVarData]] = None) -> Mapping[_GeneralVarData, float]: if self._cplex_model.solution.get_solution_type() == self._cplex_model.solution.type.none: raise RuntimeError('Cannot get reduced costs - no feasible solution was found.') if self._cplex_model.get_problem_type() in [self._cplex_model.problem_type.MILP, self._cplex_model.problem_type.MIQP, self._cplex_model.problem_type.MIQCP]: raise RuntimeError('Cannot get get reduced costs for mixed-integer problems') symbol_map = self._writer.symbol_map if vars_to_load is None: var_names = self._cplex_model.variables.get_names() else: var_names = [symbol_map.byObject[id(v)] for v in vars_to_load] rc = self._cplex_model.solution.get_reduced_costs(var_names) res = ComponentMap() for name, val in zip(var_names, rc): if name == 'obj_const': continue v = symbol_map.bySymbol[name]() res[v] = val return res