# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
import logging
from typing import List, Dict, Optional
from pyomo.common.collections import ComponentMap
from pyomo.common.dependencies import attempt_import
from pyomo.common.errors import PyomoException
from pyomo.common.timing import HierarchicalTimer
from pyomo.common.config import ConfigValue, NonNegativeInt
from pyomo.common.tee import TeeStream, capture_output
from pyomo.common.log import LogStream
from pyomo.core.kernel.objective import minimize, maximize
from pyomo.core.base import SymbolMap
from pyomo.core.base.var import _GeneralVarData
from pyomo.core.base.constraint import _GeneralConstraintData
from pyomo.core.base.sos import _SOSConstraintData
from pyomo.core.base.param import _ParamData
from pyomo.core.expr.numvalue import value, is_constant
from pyomo.repn import generate_standard_repn
from pyomo.core.expr.numeric_expr import NPV_MaxExpression, NPV_MinExpression
from pyomo.contrib.appsi.base import (
PersistentSolver,
Results,
TerminationCondition,
MIPSolverConfig,
PersistentBase,
PersistentSolutionLoader,
)
from pyomo.contrib.appsi.cmodel import cmodel, cmodel_available
from pyomo.common.dependencies import numpy as np
from pyomo.core.staleflag import StaleFlagManager
import sys
logger = logging.getLogger(__name__)
highspy, highspy_available = attempt_import('highspy')
class DegreeError(PyomoException):
pass
class HighsConfig(MIPSolverConfig):
def __init__(
self,
description=None,
doc=None,
implicit=False,
implicit_domain=None,
visibility=0,
):
super(HighsConfig, self).__init__(
description=description,
doc=doc,
implicit=implicit,
implicit_domain=implicit_domain,
visibility=visibility,
)
self.declare('logfile', ConfigValue(domain=str))
self.declare('solver_output_logger', ConfigValue())
self.declare('log_level', ConfigValue(domain=NonNegativeInt))
self.logfile = ''
self.solver_output_logger = logger
self.log_level = logging.INFO
[docs]class HighsResults(Results):
def __init__(self, solver):
super().__init__()
self.wallclock_time = None
self.solution_loader = PersistentSolutionLoader(solver=solver)
class _MutableVarBounds(object):
def __init__(self, lower_expr, upper_expr, pyomo_var_id, var_map, highs):
self.pyomo_var_id = pyomo_var_id
self.lower_expr = lower_expr
self.upper_expr = upper_expr
self.var_map = var_map
self.highs = highs
def update(self):
col_ndx = self.var_map[self.pyomo_var_id]
lb = value(self.lower_expr)
ub = value(self.upper_expr)
self.highs.changeColBounds(col_ndx, lb, ub)
class _MutableLinearCoefficient(object):
def __init__(self, pyomo_con, pyomo_var_id, con_map, var_map, expr, highs):
self.expr = expr
self.highs = highs
self.pyomo_var_id = pyomo_var_id
self.pyomo_con = pyomo_con
self.con_map = con_map
self.var_map = var_map
def update(self):
row_ndx = self.con_map[self.pyomo_con]
col_ndx = self.var_map[self.pyomo_var_id]
self.highs.changeCoeff(row_ndx, col_ndx, value(self.expr))
class _MutableObjectiveCoefficient(object):
def __init__(self, pyomo_var_id, var_map, expr, highs):
self.expr = expr
self.highs = highs
self.pyomo_var_id = pyomo_var_id
self.var_map = var_map
def update(self):
col_ndx = self.var_map[self.pyomo_var_id]
self.highs.changeColCost(col_ndx, value(self.expr))
class _MutableObjectiveOffset(object):
def __init__(self, expr, highs):
self.expr = expr
self.highs = highs
def update(self):
self.highs.changeObjectiveOffset(value(self.expr))
class _MutableConstraintBounds(object):
def __init__(self, lower_expr, upper_expr, pyomo_con, con_map, highs):
self.lower_expr = lower_expr
self.upper_expr = upper_expr
self.con = pyomo_con
self.con_map = con_map
self.highs = highs
def update(self):
row_ndx = self.con_map[self.con]
lb = value(self.lower_expr)
ub = value(self.upper_expr)
self.highs.changeRowBounds(row_ndx, lb, ub)
[docs]class Highs(PersistentBase, PersistentSolver):
"""
Interface to HiGHS
"""
_available = None
def __init__(self, only_child_vars=False):
super().__init__(only_child_vars=only_child_vars)
self._config = HighsConfig()
self._solver_options = dict()
self._solver_model = None
self._pyomo_var_to_solver_var_map = dict()
self._pyomo_con_to_solver_con_map = dict()
self._solver_con_to_pyomo_con_map = dict()
self._mutable_helpers = dict()
self._mutable_bounds = dict()
self._objective_helpers = list()
self._last_results_object: Optional[HighsResults] = None
self._sol = None
[docs] def available(self):
if highspy_available:
return self.Availability.FullLicense
else:
return self.Availability.NotFound
[docs] def version(self):
version = (
highspy.HIGHS_VERSION_MAJOR,
highspy.HIGHS_VERSION_MINOR,
highspy.HIGHS_VERSION_PATCH,
)
return version
@property
def config(self) -> HighsConfig:
return self._config
@config.setter
def config(self, val: HighsConfig):
self._config = val
@property
def highs_options(self):
"""
A dictionary mapping solver options to values for those options. These
are solver specific.
Returns
-------
dict
A dictionary mapping solver options to values for those options
"""
return self._solver_options
@highs_options.setter
def highs_options(self, val: Dict):
self._solver_options = val
@property
def symbol_map(self):
return SymbolMap()
# raise RuntimeError('Highs interface does not have a symbol map')
def _solve(self, timer: HierarchicalTimer):
config = self.config
options = self.highs_options
ostreams = [
LogStream(
level=self.config.log_level, logger=self.config.solver_output_logger
)
]
if self.config.stream_solver:
ostreams.append(sys.stdout)
with TeeStream(*ostreams) as t:
with capture_output(output=t.STDOUT, capture_fd=True):
self._solver_model.setOptionValue('log_to_console', True)
if config.logfile != '':
self._solver_model.setOptionValue('log_file', config.logfile)
if config.time_limit is not None:
self._solver_model.setOptionValue('time_limit', config.time_limit)
if config.mip_gap is not None:
self._solver_model.setOptionValue('mip_rel_gap', config.mip_gap)
for key, option in options.items():
self._solver_model.setOptionValue(key, option)
timer.start('optimize')
self._solver_model.run()
timer.stop('optimize')
return self._postsolve(timer)
[docs] def solve(self, model, timer: HierarchicalTimer = None) -> Results:
StaleFlagManager.mark_all_as_stale()
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
if timer is None:
timer = HierarchicalTimer()
if model is not self._model:
timer.start('set_instance')
self.set_instance(model)
timer.stop('set_instance')
else:
timer.start('update')
self.update(timer=timer)
timer.stop('update')
res = self._solve(timer)
self._last_results_object = res
if self.config.report_timing:
logger.info('\n' + str(timer))
return res
def _process_domain_and_bounds(self, var_id):
_v, _lb, _ub, _fixed, _domain_interval, _value = self._vars[var_id]
lb, ub, step = _domain_interval
if lb is None:
lb = -highspy.kHighsInf
if ub is None:
ub = highspy.kHighsInf
if step == 0:
vtype = highspy.HighsVarType.kContinuous
elif step == 1:
vtype = highspy.HighsVarType.kInteger
else:
raise ValueError(
f'Unrecognized domain step: {step} (should be either 0 or 1)'
)
if _fixed:
lb = _value
ub = _value
else:
if _lb is not None or _ub is not None:
if not is_constant(_lb) or not is_constant(_ub):
if _lb is None:
tmp_lb = -highspy.kHighsInf
else:
tmp_lb = _lb
if _ub is None:
tmp_ub = highspy.kHighsInf
else:
tmp_ub = _ub
mutable_bound = _MutableVarBounds(
lower_expr=NPV_MaxExpression((tmp_lb, lb)),
upper_expr=NPV_MinExpression((tmp_ub, ub)),
pyomo_var_id=var_id,
var_map=self._pyomo_var_to_solver_var_map,
highs=self._solver_model,
)
self._mutable_bounds[var_id] = (_v, mutable_bound)
if _lb is not None:
lb = max(value(_lb), lb)
if _ub is not None:
ub = min(value(_ub), ub)
return lb, ub, vtype
def _add_variables(self, variables: List[_GeneralVarData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
lbs = list()
ubs = list()
indices = list()
vtypes = list()
current_num_vars = len(self._pyomo_var_to_solver_var_map)
for v in variables:
v_id = id(v)
lb, ub, vtype = self._process_domain_and_bounds(v_id)
lbs.append(lb)
ubs.append(ub)
vtypes.append(vtype)
indices.append(current_num_vars)
self._pyomo_var_to_solver_var_map[v_id] = current_num_vars
current_num_vars += 1
self._solver_model.addVars(
len(lbs), np.array(lbs, dtype=np.double), np.array(ubs, dtype=np.double)
)
self._solver_model.changeColsIntegrality(
len(vtypes), np.array(indices), np.array(vtypes)
)
def _add_params(self, params: List[_ParamData]):
pass
def _reinit(self):
saved_config = self.config
saved_options = self.highs_options
saved_update_config = self.update_config
self.__init__(only_child_vars=self._only_child_vars)
self.config = saved_config
self.highs_options = saved_options
self.update_config = saved_update_config
[docs] def set_instance(self, model):
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
if not self.available():
c = self.__class__
raise PyomoException(
f'Solver {c.__module__}.{c.__qualname__} is not available '
f'({self.available()}).'
)
ostreams = [
LogStream(
level=self.config.log_level, logger=self.config.solver_output_logger
)
]
if self.config.stream_solver:
ostreams.append(sys.stdout)
with TeeStream(*ostreams) as t:
with capture_output(output=t.STDOUT, capture_fd=True):
self._reinit()
self._model = model
if self.use_extensions and cmodel_available:
self._expr_types = cmodel.PyomoExprTypes()
self._solver_model = highspy.Highs()
self.add_block(model)
if self._objective is None:
self.set_objective(None)
def _add_constraints(self, cons: List[_GeneralConstraintData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
current_num_cons = len(self._pyomo_con_to_solver_con_map)
lbs = list()
ubs = list()
starts = list()
var_indices = list()
coef_values = list()
for con in cons:
repn = generate_standard_repn(
con.body, quadratic=False, compute_values=False
)
if repn.nonlinear_expr is not None:
raise DegreeError(
f'Highs interface does not support expressions of degree {repn.polynomial_degree()}'
)
starts.append(len(coef_values))
for ndx, coef in enumerate(repn.linear_coefs):
v = repn.linear_vars[ndx]
v_id = id(v)
coef_val = value(coef)
if not is_constant(coef):
mutable_linear_coefficient = _MutableLinearCoefficient(
pyomo_con=con,
pyomo_var_id=v_id,
con_map=self._pyomo_con_to_solver_con_map,
var_map=self._pyomo_var_to_solver_var_map,
expr=coef,
highs=self._solver_model,
)
if con not in self._mutable_helpers:
self._mutable_helpers[con] = list()
self._mutable_helpers[con].append(mutable_linear_coefficient)
if coef_val == 0:
continue
var_indices.append(self._pyomo_var_to_solver_var_map[v_id])
coef_values.append(coef_val)
if con.has_lb():
lb = con.lower - repn.constant
else:
lb = -highspy.kHighsInf
if con.has_ub():
ub = con.upper - repn.constant
else:
ub = highspy.kHighsInf
if not is_constant(lb) or not is_constant(ub):
mutable_con_bounds = _MutableConstraintBounds(
lower_expr=lb,
upper_expr=ub,
pyomo_con=con,
con_map=self._pyomo_con_to_solver_con_map,
highs=self._solver_model,
)
if con not in self._mutable_helpers:
self._mutable_helpers[con] = [mutable_con_bounds]
else:
self._mutable_helpers[con].append(mutable_con_bounds)
lbs.append(value(lb))
ubs.append(value(ub))
self._pyomo_con_to_solver_con_map[con] = current_num_cons
self._solver_con_to_pyomo_con_map[current_num_cons] = con
current_num_cons += 1
self._solver_model.addRows(
len(lbs),
np.array(lbs, dtype=np.double),
np.array(ubs, dtype=np.double),
len(coef_values),
np.array(starts),
np.array(var_indices),
np.array(coef_values, dtype=np.double),
)
def _add_sos_constraints(self, cons: List[_SOSConstraintData]):
if cons:
raise NotImplementedError(
'Highs interface does not support SOS constraints'
)
def _remove_constraints(self, cons: List[_GeneralConstraintData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
indices_to_remove = list()
for con in cons:
con_ndx = self._pyomo_con_to_solver_con_map.pop(con)
del self._solver_con_to_pyomo_con_map[con_ndx]
indices_to_remove.append(con_ndx)
self._mutable_helpers.pop(con, None)
self._solver_model.deleteRows(
len(indices_to_remove), np.array(indices_to_remove)
)
con_ndx = 0
new_con_map = dict()
for c in self._pyomo_con_to_solver_con_map.keys():
new_con_map[c] = con_ndx
con_ndx += 1
self._pyomo_con_to_solver_con_map.clear()
self._pyomo_con_to_solver_con_map.update(new_con_map)
self._solver_con_to_pyomo_con_map.clear()
self._solver_con_to_pyomo_con_map.update(
{v: k for k, v in self._pyomo_con_to_solver_con_map.items()}
)
def _remove_sos_constraints(self, cons: List[_SOSConstraintData]):
if cons:
raise NotImplementedError(
'Highs interface does not support SOS constraints'
)
def _remove_variables(self, variables: List[_GeneralVarData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
indices_to_remove = list()
for v in variables:
v_id = id(v)
v_ndx = self._pyomo_var_to_solver_var_map.pop(v_id)
indices_to_remove.append(v_ndx)
self._mutable_bounds.pop(v_id, None)
indices_to_remove.sort()
self._solver_model.deleteVars(
len(indices_to_remove), np.array(indices_to_remove)
)
v_ndx = 0
new_var_map = dict()
for v_id in self._pyomo_var_to_solver_var_map.keys():
new_var_map[v_id] = v_ndx
v_ndx += 1
self._pyomo_var_to_solver_var_map.clear()
self._pyomo_var_to_solver_var_map.update(new_var_map)
def _remove_params(self, params: List[_ParamData]):
pass
def _update_variables(self, variables: List[_GeneralVarData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
indices = list()
lbs = list()
ubs = list()
vtypes = list()
for v in variables:
v_id = id(v)
self._mutable_bounds.pop(v_id, None)
v_ndx = self._pyomo_var_to_solver_var_map[v_id]
lb, ub, vtype = self._process_domain_and_bounds(v_id)
lbs.append(lb)
ubs.append(ub)
vtypes.append(vtype)
indices.append(v_ndx)
self._solver_model.changeColsBounds(
len(indices),
np.array(indices),
np.array(lbs, dtype=np.double),
np.array(ubs, dtype=np.double),
)
self._solver_model.changeColsIntegrality(
len(indices), np.array(indices), np.array(vtypes)
)
[docs] def update_params(self):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
for con, helpers in self._mutable_helpers.items():
for helper in helpers:
helper.update()
for k, (v, helper) in self._mutable_bounds.items():
helper.update()
for helper in self._objective_helpers:
helper.update()
def _set_objective(self, obj):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
n = len(self._pyomo_var_to_solver_var_map)
indices = np.arange(n)
costs = np.zeros(n, dtype=np.double)
self._objective_helpers = list()
if obj is None:
sense = highspy.ObjSense.kMinimize
self._solver_model.changeObjectiveOffset(0)
else:
if obj.sense == minimize:
sense = highspy.ObjSense.kMinimize
elif obj.sense == maximize:
sense = highspy.ObjSense.kMaximize
else:
raise ValueError(
'Objective sense is not recognized: {0}'.format(obj.sense)
)
repn = generate_standard_repn(
obj.expr, quadratic=False, compute_values=False
)
if repn.nonlinear_expr is not None:
raise DegreeError(
f'Highs interface does not support expressions of degree {repn.polynomial_degree()}'
)
for coef, v in zip(repn.linear_coefs, repn.linear_vars):
v_id = id(v)
v_ndx = self._pyomo_var_to_solver_var_map[v_id]
costs[v_ndx] = value(coef)
if not is_constant(coef):
mutable_objective_coef = _MutableObjectiveCoefficient(
pyomo_var_id=v_id,
var_map=self._pyomo_var_to_solver_var_map,
expr=coef,
highs=self._solver_model,
)
self._objective_helpers.append(mutable_objective_coef)
self._solver_model.changeObjectiveOffset(value(repn.constant))
if not is_constant(repn.constant):
mutable_objective_offset = _MutableObjectiveOffset(
expr=repn.constant, highs=self._solver_model
)
self._objective_helpers.append(mutable_objective_offset)
self._solver_model.changeObjectiveSense(sense)
self._solver_model.changeColsCost(n, indices, costs)
def _postsolve(self, timer: HierarchicalTimer):
config = self.config
highs = self._solver_model
status = highs.getModelStatus()
results = HighsResults(self)
results.wallclock_time = highs.getRunTime()
if status == highspy.HighsModelStatus.kNotset:
results.termination_condition = TerminationCondition.unknown
elif status == highspy.HighsModelStatus.kLoadError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kModelError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kPresolveError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kSolveError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kPostsolveError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kModelEmpty:
results.termination_condition = TerminationCondition.unknown
elif status == highspy.HighsModelStatus.kOptimal:
results.termination_condition = TerminationCondition.optimal
elif status == highspy.HighsModelStatus.kInfeasible:
results.termination_condition = TerminationCondition.infeasible
elif status == highspy.HighsModelStatus.kUnboundedOrInfeasible:
results.termination_condition = TerminationCondition.infeasibleOrUnbounded
elif status == highspy.HighsModelStatus.kUnbounded:
results.termination_condition = TerminationCondition.unbounded
elif status == highspy.HighsModelStatus.kObjectiveBound:
results.termination_condition = TerminationCondition.objectiveLimit
elif status == highspy.HighsModelStatus.kObjectiveTarget:
results.termination_condition = TerminationCondition.objectiveLimit
elif status == highspy.HighsModelStatus.kTimeLimit:
results.termination_condition = TerminationCondition.maxTimeLimit
elif status == highspy.HighsModelStatus.kIterationLimit:
results.termination_condition = TerminationCondition.maxIterations
elif status == highspy.HighsModelStatus.kUnknown:
results.termination_condition = TerminationCondition.unknown
else:
results.termination_condition = TerminationCondition.unknown
timer.start('load solution')
self._sol = highs.getSolution()
has_feasible_solution = False
if results.termination_condition == TerminationCondition.optimal:
has_feasible_solution = True
elif results.termination_condition in {
TerminationCondition.objectiveLimit,
TerminationCondition.maxIterations,
TerminationCondition.maxTimeLimit,
}:
if self._sol.value_valid:
has_feasible_solution = True
if config.load_solution:
if has_feasible_solution:
if results.termination_condition != TerminationCondition.optimal:
logger.warning(
'Loading a feasible but suboptimal solution. '
'Please set load_solution=False and check '
'results.termination_condition and '
'results.found_feasible_solution() before loading a solution.'
)
self.load_vars()
else:
raise RuntimeError(
'A feasible solution was not found, so no solution can be loaded. '
'If using the appsi.solvers.Highs interface, you can '
'set opt.config.load_solution=False. If using the environ.SolverFactory '
'interface, you can set opt.solve(model, load_solutions = False). '
'Then you can check results.termination_condition and '
'results.best_feasible_objective before loading a solution.'
)
timer.stop('load solution')
info = highs.getInfo()
results.best_objective_bound = None
results.best_feasible_objective = None
if self._objective is not None:
if has_feasible_solution:
results.best_feasible_objective = info.objective_function_value
if info.mip_node_count == -1:
if has_feasible_solution:
results.best_objective_bound = info.objective_function_value
else:
results.best_objective_bound = None
else:
results.best_objective_bound = info.mip_dual_bound
return results
[docs] def load_vars(self, vars_to_load=None):
for v, val in self.get_primals(vars_to_load=vars_to_load).items():
v.set_value(val, skip_validation=True)
StaleFlagManager.mark_all_as_stale(delayed=True)
[docs] def get_primals(self, vars_to_load=None, solution_number=0):
if self._sol is None or not self._sol.value_valid:
raise RuntimeError(
'Solver does not currently have a valid solution. Please '
'check the termination condition.'
)
res = ComponentMap()
if vars_to_load is None:
var_ids_to_load = list()
for v, ref_info in self._referenced_variables.items():
using_cons, using_sos, using_obj = ref_info
if using_cons or using_sos or (using_obj is not None):
var_ids_to_load.append(v)
else:
var_ids_to_load = [id(v) for v in vars_to_load]
var_vals = self._sol.col_value
for v_id in var_ids_to_load:
v = self._vars[v_id][0]
v_ndx = self._pyomo_var_to_solver_var_map[v_id]
res[v] = var_vals[v_ndx]
return res
[docs] def get_reduced_costs(self, vars_to_load=None):
if self._sol is None or not self._sol.dual_valid:
raise RuntimeError(
'Solver does not currently have valid reduced costs. Please '
'check the termination condition.'
)
res = ComponentMap()
if vars_to_load is None:
var_ids_to_load = list(self._vars.keys())
else:
var_ids_to_load = [id(v) for v in vars_to_load]
var_vals = self._sol.col_dual
for v_id in var_ids_to_load:
v = self._vars[v_id][0]
v_ndx = self._pyomo_var_to_solver_var_map[v_id]
res[v] = var_vals[v_ndx]
return res
[docs] def get_duals(self, cons_to_load=None):
if self._sol is None or not self._sol.dual_valid:
raise RuntimeError(
'Solver does not currently have valid duals. Please '
'check the termination condition.'
)
res = dict()
if cons_to_load is None:
cons_to_load = list(self._pyomo_con_to_solver_con_map.keys())
duals = self._sol.row_dual
for c in cons_to_load:
c_ndx = self._pyomo_con_to_solver_con_map[c]
res[c] = duals[c_ndx]
return res
[docs] def get_slacks(self, cons_to_load=None):
if self._sol is None or not self._sol.value_valid:
raise RuntimeError(
'Solver does not currently have valid slacks. Please '
'check the termination condition.'
)
res = dict()
if cons_to_load is None:
cons_to_load = list(self._pyomo_con_to_solver_con_map.keys())
slacks = self._sol.row_value
for c in cons_to_load:
c_ndx = self._pyomo_con_to_solver_con_map[c]
res[c] = slacks[c_ndx]
return res