Source code for pyomo.contrib.appsi.solvers.highs

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import logging
from typing import List, Dict, Optional
from pyomo.common.collections import ComponentMap
from pyomo.common.dependencies import attempt_import
from pyomo.common.errors import PyomoException
from pyomo.common.timing import HierarchicalTimer
from pyomo.common.config import ConfigValue, NonNegativeInt
from pyomo.common.tee import TeeStream, capture_output
from pyomo.common.log import LogStream
from pyomo.core.kernel.objective import minimize, maximize
from pyomo.core.base import SymbolMap
from pyomo.core.base.var import _GeneralVarData
from pyomo.core.base.constraint import _GeneralConstraintData
from pyomo.core.base.sos import _SOSConstraintData
from pyomo.core.base.param import _ParamData
from pyomo.core.expr.numvalue import value, is_constant
from pyomo.repn import generate_standard_repn
from pyomo.core.expr.numeric_expr import NPV_MaxExpression, NPV_MinExpression
from pyomo.contrib.appsi.base import (
    PersistentSolver,
    Results,
    TerminationCondition,
    MIPSolverConfig,
    PersistentBase,
    PersistentSolutionLoader,
)
from pyomo.contrib.appsi.cmodel import cmodel, cmodel_available
from pyomo.common.dependencies import numpy as np
from pyomo.core.staleflag import StaleFlagManager
import sys

logger = logging.getLogger(__name__)

highspy, highspy_available = attempt_import('highspy')


class DegreeError(PyomoException):
    pass


class HighsConfig(MIPSolverConfig):
    def __init__(
        self,
        description=None,
        doc=None,
        implicit=False,
        implicit_domain=None,
        visibility=0,
    ):
        super(HighsConfig, self).__init__(
            description=description,
            doc=doc,
            implicit=implicit,
            implicit_domain=implicit_domain,
            visibility=visibility,
        )

        self.declare('logfile', ConfigValue(domain=str))
        self.declare('solver_output_logger', ConfigValue())
        self.declare('log_level', ConfigValue(domain=NonNegativeInt))

        self.logfile = ''
        self.solver_output_logger = logger
        self.log_level = logging.INFO


[docs]class HighsResults(Results): def __init__(self, solver): super().__init__() self.wallclock_time = None self.solution_loader = PersistentSolutionLoader(solver=solver)
class _MutableVarBounds(object): def __init__(self, lower_expr, upper_expr, pyomo_var_id, var_map, highs): self.pyomo_var_id = pyomo_var_id self.lower_expr = lower_expr self.upper_expr = upper_expr self.var_map = var_map self.highs = highs def update(self): col_ndx = self.var_map[self.pyomo_var_id] lb = value(self.lower_expr) ub = value(self.upper_expr) self.highs.changeColBounds(col_ndx, lb, ub) class _MutableLinearCoefficient(object): def __init__(self, pyomo_con, pyomo_var_id, con_map, var_map, expr, highs): self.expr = expr self.highs = highs self.pyomo_var_id = pyomo_var_id self.pyomo_con = pyomo_con self.con_map = con_map self.var_map = var_map def update(self): row_ndx = self.con_map[self.pyomo_con] col_ndx = self.var_map[self.pyomo_var_id] self.highs.changeCoeff(row_ndx, col_ndx, value(self.expr)) class _MutableObjectiveCoefficient(object): def __init__(self, pyomo_var_id, var_map, expr, highs): self.expr = expr self.highs = highs self.pyomo_var_id = pyomo_var_id self.var_map = var_map def update(self): col_ndx = self.var_map[self.pyomo_var_id] self.highs.changeColCost(col_ndx, value(self.expr)) class _MutableObjectiveOffset(object): def __init__(self, expr, highs): self.expr = expr self.highs = highs def update(self): self.highs.changeObjectiveOffset(value(self.expr)) class _MutableConstraintBounds(object): def __init__(self, lower_expr, upper_expr, pyomo_con, con_map, highs): self.lower_expr = lower_expr self.upper_expr = upper_expr self.con = pyomo_con self.con_map = con_map self.highs = highs def update(self): row_ndx = self.con_map[self.con] lb = value(self.lower_expr) ub = value(self.upper_expr) self.highs.changeRowBounds(row_ndx, lb, ub)
[docs]class Highs(PersistentBase, PersistentSolver): """ Interface to HiGHS """ _available = None def __init__(self, only_child_vars=False): super().__init__(only_child_vars=only_child_vars) self._config = HighsConfig() self._solver_options = dict() self._solver_model = None self._pyomo_var_to_solver_var_map = dict() self._pyomo_con_to_solver_con_map = dict() self._solver_con_to_pyomo_con_map = dict() self._mutable_helpers = dict() self._mutable_bounds = dict() self._objective_helpers = list() self._last_results_object: Optional[HighsResults] = None self._sol = None
[docs] def available(self): if highspy_available: return self.Availability.FullLicense else: return self.Availability.NotFound
[docs] def version(self): version = ( highspy.HIGHS_VERSION_MAJOR, highspy.HIGHS_VERSION_MINOR, highspy.HIGHS_VERSION_PATCH, ) return version
@property def config(self) -> HighsConfig: return self._config @config.setter def config(self, val: HighsConfig): self._config = val @property def highs_options(self): """ A dictionary mapping solver options to values for those options. These are solver specific. Returns ------- dict A dictionary mapping solver options to values for those options """ return self._solver_options @highs_options.setter def highs_options(self, val: Dict): self._solver_options = val @property def symbol_map(self): return SymbolMap() # raise RuntimeError('Highs interface does not have a symbol map') def _solve(self, timer: HierarchicalTimer): config = self.config options = self.highs_options ostreams = [ LogStream( level=self.config.log_level, logger=self.config.solver_output_logger ) ] if self.config.stream_solver: ostreams.append(sys.stdout) with TeeStream(*ostreams) as t: with capture_output(output=t.STDOUT, capture_fd=True): self._solver_model.setOptionValue('log_to_console', True) if config.logfile != '': self._solver_model.setOptionValue('log_file', config.logfile) if config.time_limit is not None: self._solver_model.setOptionValue('time_limit', config.time_limit) if config.mip_gap is not None: self._solver_model.setOptionValue('mip_rel_gap', config.mip_gap) for key, option in options.items(): self._solver_model.setOptionValue(key, option) timer.start('optimize') self._solver_model.run() timer.stop('optimize') return self._postsolve(timer)
[docs] def solve(self, model, timer: HierarchicalTimer = None) -> Results: StaleFlagManager.mark_all_as_stale() if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() if timer is None: timer = HierarchicalTimer() if model is not self._model: timer.start('set_instance') self.set_instance(model) timer.stop('set_instance') else: timer.start('update') self.update(timer=timer) timer.stop('update') res = self._solve(timer) self._last_results_object = res if self.config.report_timing: logger.info('\n' + str(timer)) return res
def _process_domain_and_bounds(self, var_id): _v, _lb, _ub, _fixed, _domain_interval, _value = self._vars[var_id] lb, ub, step = _domain_interval if lb is None: lb = -highspy.kHighsInf if ub is None: ub = highspy.kHighsInf if step == 0: vtype = highspy.HighsVarType.kContinuous elif step == 1: vtype = highspy.HighsVarType.kInteger else: raise ValueError( f'Unrecognized domain step: {step} (should be either 0 or 1)' ) if _fixed: lb = _value ub = _value else: if _lb is not None or _ub is not None: if not is_constant(_lb) or not is_constant(_ub): if _lb is None: tmp_lb = -highspy.kHighsInf else: tmp_lb = _lb if _ub is None: tmp_ub = highspy.kHighsInf else: tmp_ub = _ub mutable_bound = _MutableVarBounds( lower_expr=NPV_MaxExpression((tmp_lb, lb)), upper_expr=NPV_MinExpression((tmp_ub, ub)), pyomo_var_id=var_id, var_map=self._pyomo_var_to_solver_var_map, highs=self._solver_model, ) self._mutable_bounds[var_id] = (_v, mutable_bound) if _lb is not None: lb = max(value(_lb), lb) if _ub is not None: ub = min(value(_ub), ub) return lb, ub, vtype def _add_variables(self, variables: List[_GeneralVarData]): self._sol = None if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() lbs = list() ubs = list() indices = list() vtypes = list() current_num_vars = len(self._pyomo_var_to_solver_var_map) for v in variables: v_id = id(v) lb, ub, vtype = self._process_domain_and_bounds(v_id) lbs.append(lb) ubs.append(ub) vtypes.append(vtype) indices.append(current_num_vars) self._pyomo_var_to_solver_var_map[v_id] = current_num_vars current_num_vars += 1 self._solver_model.addVars( len(lbs), np.array(lbs, dtype=np.double), np.array(ubs, dtype=np.double) ) self._solver_model.changeColsIntegrality( len(vtypes), np.array(indices), np.array(vtypes) ) def _add_params(self, params: List[_ParamData]): pass def _reinit(self): saved_config = self.config saved_options = self.highs_options saved_update_config = self.update_config self.__init__(only_child_vars=self._only_child_vars) self.config = saved_config self.highs_options = saved_options self.update_config = saved_update_config
[docs] def set_instance(self, model): if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() if not self.available(): c = self.__class__ raise PyomoException( f'Solver {c.__module__}.{c.__qualname__} is not available ' f'({self.available()}).' ) ostreams = [ LogStream( level=self.config.log_level, logger=self.config.solver_output_logger ) ] if self.config.stream_solver: ostreams.append(sys.stdout) with TeeStream(*ostreams) as t: with capture_output(output=t.STDOUT, capture_fd=True): self._reinit() self._model = model if self.use_extensions and cmodel_available: self._expr_types = cmodel.PyomoExprTypes() self._solver_model = highspy.Highs() self.add_block(model) if self._objective is None: self.set_objective(None)
def _add_constraints(self, cons: List[_GeneralConstraintData]): self._sol = None if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() current_num_cons = len(self._pyomo_con_to_solver_con_map) lbs = list() ubs = list() starts = list() var_indices = list() coef_values = list() for con in cons: repn = generate_standard_repn( con.body, quadratic=False, compute_values=False ) if repn.nonlinear_expr is not None: raise DegreeError( f'Highs interface does not support expressions of degree {repn.polynomial_degree()}' ) starts.append(len(coef_values)) for ndx, coef in enumerate(repn.linear_coefs): v = repn.linear_vars[ndx] v_id = id(v) coef_val = value(coef) if not is_constant(coef): mutable_linear_coefficient = _MutableLinearCoefficient( pyomo_con=con, pyomo_var_id=v_id, con_map=self._pyomo_con_to_solver_con_map, var_map=self._pyomo_var_to_solver_var_map, expr=coef, highs=self._solver_model, ) if con not in self._mutable_helpers: self._mutable_helpers[con] = list() self._mutable_helpers[con].append(mutable_linear_coefficient) if coef_val == 0: continue var_indices.append(self._pyomo_var_to_solver_var_map[v_id]) coef_values.append(coef_val) if con.has_lb(): lb = con.lower - repn.constant else: lb = -highspy.kHighsInf if con.has_ub(): ub = con.upper - repn.constant else: ub = highspy.kHighsInf if not is_constant(lb) or not is_constant(ub): mutable_con_bounds = _MutableConstraintBounds( lower_expr=lb, upper_expr=ub, pyomo_con=con, con_map=self._pyomo_con_to_solver_con_map, highs=self._solver_model, ) if con not in self._mutable_helpers: self._mutable_helpers[con] = [mutable_con_bounds] else: self._mutable_helpers[con].append(mutable_con_bounds) lbs.append(value(lb)) ubs.append(value(ub)) self._pyomo_con_to_solver_con_map[con] = current_num_cons self._solver_con_to_pyomo_con_map[current_num_cons] = con current_num_cons += 1 self._solver_model.addRows( len(lbs), np.array(lbs, dtype=np.double), np.array(ubs, dtype=np.double), len(coef_values), np.array(starts), np.array(var_indices), np.array(coef_values, dtype=np.double), ) def _add_sos_constraints(self, cons: List[_SOSConstraintData]): if cons: raise NotImplementedError( 'Highs interface does not support SOS constraints' ) def _remove_constraints(self, cons: List[_GeneralConstraintData]): self._sol = None if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() indices_to_remove = list() for con in cons: con_ndx = self._pyomo_con_to_solver_con_map.pop(con) del self._solver_con_to_pyomo_con_map[con_ndx] indices_to_remove.append(con_ndx) self._mutable_helpers.pop(con, None) self._solver_model.deleteRows( len(indices_to_remove), np.array(indices_to_remove) ) con_ndx = 0 new_con_map = dict() for c in self._pyomo_con_to_solver_con_map.keys(): new_con_map[c] = con_ndx con_ndx += 1 self._pyomo_con_to_solver_con_map.clear() self._pyomo_con_to_solver_con_map.update(new_con_map) self._solver_con_to_pyomo_con_map.clear() self._solver_con_to_pyomo_con_map.update( {v: k for k, v in self._pyomo_con_to_solver_con_map.items()} ) def _remove_sos_constraints(self, cons: List[_SOSConstraintData]): if cons: raise NotImplementedError( 'Highs interface does not support SOS constraints' ) def _remove_variables(self, variables: List[_GeneralVarData]): self._sol = None if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() indices_to_remove = list() for v in variables: v_id = id(v) v_ndx = self._pyomo_var_to_solver_var_map.pop(v_id) indices_to_remove.append(v_ndx) self._mutable_bounds.pop(v_id, None) indices_to_remove.sort() self._solver_model.deleteVars( len(indices_to_remove), np.array(indices_to_remove) ) v_ndx = 0 new_var_map = dict() for v_id in self._pyomo_var_to_solver_var_map.keys(): new_var_map[v_id] = v_ndx v_ndx += 1 self._pyomo_var_to_solver_var_map.clear() self._pyomo_var_to_solver_var_map.update(new_var_map) def _remove_params(self, params: List[_ParamData]): pass def _update_variables(self, variables: List[_GeneralVarData]): self._sol = None if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() indices = list() lbs = list() ubs = list() vtypes = list() for v in variables: v_id = id(v) self._mutable_bounds.pop(v_id, None) v_ndx = self._pyomo_var_to_solver_var_map[v_id] lb, ub, vtype = self._process_domain_and_bounds(v_id) lbs.append(lb) ubs.append(ub) vtypes.append(vtype) indices.append(v_ndx) self._solver_model.changeColsBounds( len(indices), np.array(indices), np.array(lbs, dtype=np.double), np.array(ubs, dtype=np.double), ) self._solver_model.changeColsIntegrality( len(indices), np.array(indices), np.array(vtypes) )
[docs] def update_params(self): self._sol = None if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() for con, helpers in self._mutable_helpers.items(): for helper in helpers: helper.update() for k, (v, helper) in self._mutable_bounds.items(): helper.update() for helper in self._objective_helpers: helper.update()
def _set_objective(self, obj): self._sol = None if self._last_results_object is not None: self._last_results_object.solution_loader.invalidate() n = len(self._pyomo_var_to_solver_var_map) indices = np.arange(n) costs = np.zeros(n, dtype=np.double) self._objective_helpers = list() if obj is None: sense = highspy.ObjSense.kMinimize self._solver_model.changeObjectiveOffset(0) else: if obj.sense == minimize: sense = highspy.ObjSense.kMinimize elif obj.sense == maximize: sense = highspy.ObjSense.kMaximize else: raise ValueError( 'Objective sense is not recognized: {0}'.format(obj.sense) ) repn = generate_standard_repn( obj.expr, quadratic=False, compute_values=False ) if repn.nonlinear_expr is not None: raise DegreeError( f'Highs interface does not support expressions of degree {repn.polynomial_degree()}' ) for coef, v in zip(repn.linear_coefs, repn.linear_vars): v_id = id(v) v_ndx = self._pyomo_var_to_solver_var_map[v_id] costs[v_ndx] = value(coef) if not is_constant(coef): mutable_objective_coef = _MutableObjectiveCoefficient( pyomo_var_id=v_id, var_map=self._pyomo_var_to_solver_var_map, expr=coef, highs=self._solver_model, ) self._objective_helpers.append(mutable_objective_coef) self._solver_model.changeObjectiveOffset(value(repn.constant)) if not is_constant(repn.constant): mutable_objective_offset = _MutableObjectiveOffset( expr=repn.constant, highs=self._solver_model ) self._objective_helpers.append(mutable_objective_offset) self._solver_model.changeObjectiveSense(sense) self._solver_model.changeColsCost(n, indices, costs) def _postsolve(self, timer: HierarchicalTimer): config = self.config highs = self._solver_model status = highs.getModelStatus() results = HighsResults(self) results.wallclock_time = highs.getRunTime() if status == highspy.HighsModelStatus.kNotset: results.termination_condition = TerminationCondition.unknown elif status == highspy.HighsModelStatus.kLoadError: results.termination_condition = TerminationCondition.error elif status == highspy.HighsModelStatus.kModelError: results.termination_condition = TerminationCondition.error elif status == highspy.HighsModelStatus.kPresolveError: results.termination_condition = TerminationCondition.error elif status == highspy.HighsModelStatus.kSolveError: results.termination_condition = TerminationCondition.error elif status == highspy.HighsModelStatus.kPostsolveError: results.termination_condition = TerminationCondition.error elif status == highspy.HighsModelStatus.kModelEmpty: results.termination_condition = TerminationCondition.unknown elif status == highspy.HighsModelStatus.kOptimal: results.termination_condition = TerminationCondition.optimal elif status == highspy.HighsModelStatus.kInfeasible: results.termination_condition = TerminationCondition.infeasible elif status == highspy.HighsModelStatus.kUnboundedOrInfeasible: results.termination_condition = TerminationCondition.infeasibleOrUnbounded elif status == highspy.HighsModelStatus.kUnbounded: results.termination_condition = TerminationCondition.unbounded elif status == highspy.HighsModelStatus.kObjectiveBound: results.termination_condition = TerminationCondition.objectiveLimit elif status == highspy.HighsModelStatus.kObjectiveTarget: results.termination_condition = TerminationCondition.objectiveLimit elif status == highspy.HighsModelStatus.kTimeLimit: results.termination_condition = TerminationCondition.maxTimeLimit elif status == highspy.HighsModelStatus.kIterationLimit: results.termination_condition = TerminationCondition.maxIterations elif status == highspy.HighsModelStatus.kUnknown: results.termination_condition = TerminationCondition.unknown else: results.termination_condition = TerminationCondition.unknown timer.start('load solution') self._sol = highs.getSolution() has_feasible_solution = False if results.termination_condition == TerminationCondition.optimal: has_feasible_solution = True elif results.termination_condition in { TerminationCondition.objectiveLimit, TerminationCondition.maxIterations, TerminationCondition.maxTimeLimit, }: if self._sol.value_valid: has_feasible_solution = True if config.load_solution: if has_feasible_solution: if results.termination_condition != TerminationCondition.optimal: logger.warning( 'Loading a feasible but suboptimal solution. ' 'Please set load_solution=False and check ' 'results.termination_condition and ' 'results.found_feasible_solution() before loading a solution.' ) self.load_vars() else: raise RuntimeError( 'A feasible solution was not found, so no solution can be loaded. ' 'If using the appsi.solvers.Highs interface, you can ' 'set opt.config.load_solution=False. If using the environ.SolverFactory ' 'interface, you can set opt.solve(model, load_solutions = False). ' 'Then you can check results.termination_condition and ' 'results.best_feasible_objective before loading a solution.' ) timer.stop('load solution') info = highs.getInfo() results.best_objective_bound = None results.best_feasible_objective = None if self._objective is not None: if has_feasible_solution: results.best_feasible_objective = info.objective_function_value if info.mip_node_count == -1: if has_feasible_solution: results.best_objective_bound = info.objective_function_value else: results.best_objective_bound = None else: results.best_objective_bound = info.mip_dual_bound return results
[docs] def load_vars(self, vars_to_load=None): for v, val in self.get_primals(vars_to_load=vars_to_load).items(): v.set_value(val, skip_validation=True) StaleFlagManager.mark_all_as_stale(delayed=True)
[docs] def get_primals(self, vars_to_load=None, solution_number=0): if self._sol is None or not self._sol.value_valid: raise RuntimeError( 'Solver does not currently have a valid solution. Please ' 'check the termination condition.' ) res = ComponentMap() if vars_to_load is None: var_ids_to_load = list() for v, ref_info in self._referenced_variables.items(): using_cons, using_sos, using_obj = ref_info if using_cons or using_sos or (using_obj is not None): var_ids_to_load.append(v) else: var_ids_to_load = [id(v) for v in vars_to_load] var_vals = self._sol.col_value for v_id in var_ids_to_load: v = self._vars[v_id][0] v_ndx = self._pyomo_var_to_solver_var_map[v_id] res[v] = var_vals[v_ndx] return res
[docs] def get_reduced_costs(self, vars_to_load=None): if self._sol is None or not self._sol.dual_valid: raise RuntimeError( 'Solver does not currently have valid reduced costs. Please ' 'check the termination condition.' ) res = ComponentMap() if vars_to_load is None: var_ids_to_load = list(self._vars.keys()) else: var_ids_to_load = [id(v) for v in vars_to_load] var_vals = self._sol.col_dual for v_id in var_ids_to_load: v = self._vars[v_id][0] v_ndx = self._pyomo_var_to_solver_var_map[v_id] res[v] = var_vals[v_ndx] return res
[docs] def get_duals(self, cons_to_load=None): if self._sol is None or not self._sol.dual_valid: raise RuntimeError( 'Solver does not currently have valid duals. Please ' 'check the termination condition.' ) res = dict() if cons_to_load is None: cons_to_load = list(self._pyomo_con_to_solver_con_map.keys()) duals = self._sol.row_dual for c in cons_to_load: c_ndx = self._pyomo_con_to_solver_con_map[c] res[c] = duals[c_ndx] return res
[docs] def get_slacks(self, cons_to_load=None): if self._sol is None or not self._sol.value_valid: raise RuntimeError( 'Solver does not currently have valid slacks. Please ' 'check the termination condition.' ) res = dict() if cons_to_load is None: cons_to_load = list(self._pyomo_con_to_solver_con_map.keys()) slacks = self._sol.row_value for c in cons_to_load: c_ndx = self._pyomo_con_to_solver_con_map[c] res[c] = slacks[c_ndx] return res