# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
import datetime
import io
import math
import os
import logging
from typing import Mapping, Sequence
from pyomo.common.collections import ComponentMap
from pyomo.common.config import ConfigValue
from pyomo.common.dependencies import attempt_import
from pyomo.common.enums import ObjectiveSense
from pyomo.common.errors import ApplicationError, InfeasibleConstraintException
from pyomo.common.shutdown import python_is_shutting_down
from pyomo.common.tee import capture_output, TeeStream
from pyomo.common.timing import HierarchicalTimer
from pyomo.core.staleflag import StaleFlagManager
from pyomo.core.base import VarData, ConstraintData
from pyomo.contrib.solver.common.base import SolverBase, Availability
from pyomo.contrib.solver.common.config import BranchAndBoundConfig
from pyomo.contrib.solver.common.util import (
NoFeasibleSolutionError,
NoOptimalSolutionError,
NoDualsError,
NoReducedCostsError,
NoSolutionError,
)
from pyomo.contrib.solver.common.solution_loader import NoSolutionSolutionLoader
from pyomo.contrib.solver.common.results import (
Results,
SolutionStatus,
TerminationCondition,
get_infeasible_results,
)
from pyomo.contrib.solver.common.solution_loader import SolutionLoader
import time
logger = logging.getLogger(__name__)
gurobipy, gurobipy_available = attempt_import('gurobipy')
[docs]
class GurobiConfig(BranchAndBoundConfig):
[docs]
def __init__(
self,
description=None,
doc=None,
implicit=False,
implicit_domain=None,
visibility=0,
):
BranchAndBoundConfig.__init__(
self,
description=description,
doc=doc,
implicit=implicit,
implicit_domain=implicit_domain,
visibility=visibility,
)
self.warmstart_discrete_vars: bool = self.declare(
'warmstart_discrete_vars',
ConfigValue(
default=False,
domain=bool,
description="If True, the current values of the integer variables "
"will be passed to Gurobi.",
),
)
[docs]
class GurobiDirectSolutionLoaderBase(SolutionLoader):
[docs]
def __init__(self, solver_model, pyomo_model) -> None:
super().__init__()
self._solver_model = solver_model
self._pyomo_model = pyomo_model # needed for suffixes
GurobiDirectBase._register_env_client()
def _get_active_solution_id(self) -> int:
return self._solver_model.getParamInfo('SolutionNumber')[2]
def _set_solution_id(self, solution_id: int) -> int:
previous_id = self._get_active_solution_id()
self._solver_model.setParam('SolutionNumber', solution_id)
return previous_id
[docs]
def get_number_of_solutions(self) -> int:
return self._solver_model.SolCount
[docs]
def get_solution_ids(self) -> list:
return list(range(self.get_number_of_solutions()))
def _get_var_lists(self):
"""
Should return a list of pyomo vars and a list of gurobipy vars
"""
raise NotImplementedError('should be implemented by derived classes')
def _get_var_map(self):
raise NotImplementedError('should be implemented by derived classes')
def _get_con_map(self):
raise NotImplementedError('should be implemented by derived classes')
def __del__(self):
# Release the gurobi license if this is the last reference to
# the environment (either through a results object or solver
# interface)
GurobiDirectBase._release_env_client()
def _get_primals(
self, vars_to_load: Sequence[VarData] | None = None
) -> tuple[list[VarData], list[float]]:
if self._solver_model.SolCount == 0:
raise NoSolutionError()
if vars_to_load is None:
pvars, gvars = self._get_var_lists()
else:
pvars = vars_to_load
gvars = list(map(self._get_var_map().__getitem__, vars_to_load))
if (
self._get_active_solution_id()
and not self._solver_model.getAttr('NumIntVars')
and not self._solver_model.getAttr('NumBinVars')
):
raise ValueError(
'Cannot obtain suboptimal solutions for a continuous model'
)
if self._get_active_solution_id():
grbFcn = 'Xn' if gurobipy.GRB.VERSION_MAJOR < 13 else 'PoolNX'
else:
grbFcn = 'X'
vals = self._solver_model.getAttr(grbFcn, gvars)
return pvars, vals
[docs]
def load_vars(self, vars_to_load: Sequence[VarData] | None = None) -> None:
pvars, vals = self._get_primals(vars_to_load=vars_to_load)
for pv, val in zip(pvars, vals):
pv.set_value(val, skip_validation=True)
StaleFlagManager.mark_all_as_stale(delayed=True)
[docs]
def get_vars(
self, vars_to_load: Sequence[VarData] | None = None
) -> Mapping[VarData, float]:
pvars, vals = self._get_primals(vars_to_load=vars_to_load)
res = ComponentMap(zip(pvars, vals))
return res
def _get_rc_all_vars(self):
pvars, gvars = self._get_var_lists()
vals = self._solver_model.getAttr("Rc", gvars)
return ComponentMap((i, val) for i, val in zip(pvars, vals))
def _get_rc_subset_vars(self, vars_to_load):
var_map = self._get_var_map()
gvars = [var_map[i] for i in vars_to_load]
vals = self._solver_model.getAttr("Rc", gvars)
return ComponentMap(zip(vars_to_load, vals))
[docs]
def get_reduced_costs(
self, vars_to_load: Sequence[VarData] | None = None
) -> Mapping[VarData, float]:
if self._get_active_solution_id():
raise NoReducedCostsError('Can only get reduced costs for solution_id = 0')
if self._solver_model.Status != gurobipy.GRB.OPTIMAL:
raise NoReducedCostsError()
if self._solver_model.IsMIP:
# this will also return True for continuous, nonconvex models
raise NoReducedCostsError('Can only get reduced costs for convex problems')
if vars_to_load is None:
res = self._get_rc_all_vars()
else:
res = self._get_rc_subset_vars(vars_to_load=vars_to_load)
return res
[docs]
def get_duals(
self, cons_to_load: Sequence[ConstraintData] | None = None
) -> dict[ConstraintData, float]:
if self._get_active_solution_id():
raise NoDualsError('Can only get duals for solution_id = 0')
if self._solver_model.Status != gurobipy.GRB.OPTIMAL:
raise NoDualsError()
if self._solver_model.IsMIP:
# this will also return True for continuous, nonconvex models
raise NoDualsError('Can only get duals for convex problems')
qcons = set(self._solver_model.getQConstrs())
con_map = self._get_con_map()
if cons_to_load is None:
cons_to_load = con_map.keys()
duals = {}
for c in cons_to_load:
gurobi_con = con_map[c]
if type(gurobi_con) is tuple:
# only linear range constraints are supported
gc1, gc2 = gurobi_con
d1 = gc1.Pi
d2 = gc2.Pi
if abs(d1) > abs(d2):
duals[c] = d1
else:
duals[c] = d2
else:
if gurobi_con in qcons:
duals[c] = gurobi_con.QCPi
else:
duals[c] = gurobi_con.Pi
return duals
[docs]
class GurobiDirectBase(SolverBase):
_num_gurobipy_env_clients = 0
_gurobipy_env = None
_available = None
_gurobipy_available = gurobipy_available
_tc_map = None
_minimum_version = (0, 0, 0)
CONFIG = GurobiConfig()
[docs]
def __init__(self, **kwds):
super().__init__(**kwds)
self._register_env_client()
self._callback = None
def __del__(self):
if not python_is_shutting_down():
self._release_env_client()
[docs]
def available(self):
if self._available is None:
# this triggers the deferred import, and for the persistent
# interface, may update the _available flag
#
# Note that we set the _available flag on the *most derived
# class* and not on the instance, or on the base class. That
# allows different derived interfaces to have different
# availability (e.g., persistent has a minimum version
# requirement that the direct interface doesn't - is that true?)
if not self._gurobipy_available:
if self._available is None:
self.__class__._available = Availability.NotFound
else:
self.__class__._available = self._check_license()
if self.version() < self._minimum_version:
self.__class__._available = Availability.BadVersion
return self._available
@staticmethod
def release_license():
if GurobiDirectBase._gurobipy_env is None:
return
if GurobiDirectBase._num_gurobipy_env_clients:
logger.warning(
"Call to GurobiDirectBase.release_license() with %s remaining "
"environment clients." % (GurobiDirectBase._num_gurobipy_env_clients,)
)
GurobiDirectBase._gurobipy_env.close()
GurobiDirectBase._gurobipy_env = None
@staticmethod
def env():
if GurobiDirectBase._gurobipy_env is None:
with capture_output(capture_fd=True):
GurobiDirectBase._gurobipy_env = gurobipy.Env()
return GurobiDirectBase._gurobipy_env
@staticmethod
def _register_env_client():
GurobiDirectBase._num_gurobipy_env_clients += 1
@staticmethod
def _release_env_client():
GurobiDirectBase._num_gurobipy_env_clients -= 1
if GurobiDirectBase._num_gurobipy_env_clients <= 0:
# Note that _num_gurobipy_env_clients should never be <0,
# but if it is, release_license will issue a warning (that
# we want to know about)
GurobiDirectBase.release_license()
def _check_license(self):
try:
model = gurobipy.Model(env=self.env())
except gurobipy.GurobiError:
return Availability.BadLicense
model.setParam('OutputFlag', 0)
try:
model.addVars(range(2001))
model.optimize()
return Availability.FullLicense
except gurobipy.GurobiError:
return Availability.LimitedLicense
finally:
model.dispose()
[docs]
def version(self):
if not gurobipy_available:
return None
version = (
gurobipy.GRB.VERSION_MAJOR,
gurobipy.GRB.VERSION_MINOR,
gurobipy.GRB.VERSION_TECHNICAL,
)
return version
def _create_solver_model(self, pyomo_model, config):
# should return gurobi_model, solution_loader, has_objective
raise NotImplementedError('should be implemented by derived classes')
def _pyomo_gurobi_var_iter(self):
# generator of tuples (pyomo_var, gurobi_var)
raise NotImplementedError('should be implemented by derived classes')
def _mipstart(self):
for pyomo_var, gurobi_var in self._pyomo_gurobi_var_iter():
if pyomo_var.is_integer() and pyomo_var.value is not None:
gurobi_var.setAttr('Start', pyomo_var.value)
[docs]
def solve(self, model, **kwds) -> Results:
start_timestamp = datetime.datetime.now(datetime.timezone.utc)
tick = time.perf_counter()
orig_cwd = os.getcwd()
try:
config = self.config(value=kwds, preserve_implicit=True)
if config.timer is None:
config.timer = HierarchicalTimer()
timer = config.timer
StaleFlagManager.mark_all_as_stale()
ostreams = [io.StringIO()] + config.tee
if config.working_dir:
os.chdir(config.working_dir)
with capture_output(TeeStream(*ostreams), capture_fd=False):
gurobi_model, solution_loader, has_obj = self._create_solver_model(
model, config
)
options = config.solver_options
gurobi_model.setParam('LogToConsole', 1)
if config.threads is not None:
gurobi_model.setParam('Threads', config.threads)
if config.time_limit is not None:
gurobi_model.setParam('TimeLimit', config.time_limit)
if config.rel_gap is not None:
gurobi_model.setParam('MIPGap', config.rel_gap)
if config.abs_gap is not None:
gurobi_model.setParam('MIPGapAbs', config.abs_gap)
if config.warmstart_discrete_vars:
self._mipstart()
for key, option in options.items():
gurobi_model.setParam(key, option)
timer.start('optimize')
gurobi_model.optimize(self._callback)
timer.stop('optimize')
res = self._populate_results(
grb_model=gurobi_model,
solution_loader=solution_loader,
has_obj=has_obj,
config=config,
)
except InfeasibleConstraintException as err:
err_msg = (
'The problem was proven to be infeasible during compilation:\n'
f'\t{str(err)}'
)
res = get_infeasible_results(
model=model, solver=self, config=config, err_msg=err_msg
)
finally:
os.chdir(orig_cwd)
res.solver_log = ostreams[0].getvalue()
tock = time.perf_counter()
res.timing_info.start_timestamp = start_timestamp
res.timing_info.wall_time = tock - tick
res.timing_info.timer = timer
return res
def _get_tc_map(self):
if GurobiDirectBase._tc_map is None:
grb = gurobipy.GRB
tc = TerminationCondition
GurobiDirectBase._tc_map = {
grb.LOADED: tc.unknown, # problem is loaded, but no solution
grb.OPTIMAL: tc.convergenceCriteriaSatisfied,
grb.INFEASIBLE: tc.provenInfeasible,
grb.INF_OR_UNBD: tc.infeasibleOrUnbounded,
grb.UNBOUNDED: tc.unbounded,
grb.CUTOFF: tc.objectiveLimit,
grb.ITERATION_LIMIT: tc.iterationLimit,
grb.NODE_LIMIT: tc.iterationLimit,
grb.TIME_LIMIT: tc.maxTimeLimit,
grb.SOLUTION_LIMIT: tc.unknown,
grb.INTERRUPTED: tc.interrupted,
grb.NUMERIC: tc.unknown,
grb.SUBOPTIMAL: tc.unknown,
grb.USER_OBJ_LIMIT: tc.objectiveLimit,
}
return GurobiDirectBase._tc_map
def _populate_results(self, grb_model, solution_loader, has_obj, config):
status = grb_model.Status
results = Results()
results.solution_loader = solution_loader
results.timing_info.gurobi_time = grb_model.Runtime
if grb_model.SolCount > 0:
if status == gurobipy.GRB.OPTIMAL:
results.solution_status = SolutionStatus.optimal
else:
results.solution_status = SolutionStatus.feasible
else:
results.solution_status = SolutionStatus.noSolution
results.termination_condition = self._get_tc_map().get(
status, TerminationCondition.unknown
)
if (
results.termination_condition
!= TerminationCondition.convergenceCriteriaSatisfied
and config.raise_exception_on_nonoptimal_result
):
raise NoOptimalSolutionError()
if has_obj:
try:
if math.isfinite(grb_model.ObjVal):
results.incumbent_objective = grb_model.ObjVal
else:
results.incumbent_objective = None
except (gurobipy.GurobiError, AttributeError):
results.incumbent_objective = None
try:
results.objective_bound = grb_model.ObjBound
except (gurobipy.GurobiError, AttributeError):
if grb_model.ModelSense == ObjectiveSense.minimize:
results.objective_bound = -math.inf
else:
results.objective_bound = math.inf
else:
results.incumbent_objective = None
results.objective_bound = None
results.extra_info.IterCount = grb_model.getAttr('IterCount')
results.extra_info.BarIterCount = grb_model.getAttr('BarIterCount')
results.extra_info.NodeCount = grb_model.getAttr('NodeCount')
config.timer.start('load solution')
if config.load_solutions:
if grb_model.SolCount > 0:
results.solution_loader.load_solution()
else:
raise NoFeasibleSolutionError()
config.timer.stop('load solution')
results.solver_config = config
results.solver_name = self.name
results.solver_version = self.version()
return results