Source code for pyomo.contrib.gdpopt.branch_and_bound

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

from collections import namedtuple
from heapq import heappush, heappop
import traceback

from pyomo.common.collections import ComponentMap
from pyomo.common.config import document_kwargs_from_configdict
from pyomo.common.errors import InfeasibleConstraintException
from pyomo.contrib.fbbt.fbbt import fbbt
from pyomo.contrib.gdpopt.algorithm_base_class import _GDPoptAlgorithm
from pyomo.contrib.gdpopt.create_oa_subproblems import (
    add_util_block,
    add_disjunction_list,
    add_disjunct_list,
    add_algebraic_variable_list,
    add_boolean_variable_lists,
    add_transformed_boolean_variable_list,
)
from pyomo.contrib.gdpopt.config_options import (
    _add_nlp_solver_configs,
    _add_BB_configs,
    _add_mip_solver_configs,
    _add_tolerance_configs,
    _add_nlp_solve_configs,
)
from pyomo.contrib.gdpopt.nlp_initialization import restore_vars_to_original_values
from pyomo.contrib.gdpopt.util import (
    copy_var_list_values,
    SuppressInfeasibleWarning,
    get_main_elapsed_time,
)
from pyomo.contrib.satsolver.satsolver import satisfiable
from pyomo.core import minimize, Suffix, Constraint, TransformationFactory
from pyomo.opt import SolverFactory, SolverStatus
from pyomo.opt import TerminationCondition as tc

_linear_degrees = {1, 0}

# Data tuple for each node that also functions as the sort key.
# Therefore, ordering of the arguments below matters.
BBNodeData = namedtuple(
    'BBNodeData',
    [
        'obj_lb',  # lower bound on objective value, sign corrected to minimize
        'obj_ub',  # upper bound on objective value, sign corrected to minimize
        'is_screened',  # True if the node has been screened; False if not.
        'is_evaluated',  # True if node has been evaluated; False if not.
        'num_unbranched_disjunctions',  # number of unbranched disjunctions
        'node_count',  # cumulative node counter
        'unbranched_disjunction_indices',  # list of unbranched disjunction indices
    ],
)


[docs]@SolverFactory.register( 'gdpopt.lbb', doc="The LBB (logic-based branch and bound) Generalized Disjunctive " "Programming (GDP) solver", ) class GDP_LBB_Solver(_GDPoptAlgorithm): """The GDPopt (Generalized Disjunctive Programming optimizer) logic-based branch and bound (LBB) solver. Accepts models that can include nonlinear, continuous variables and constraints, as well as logical conditions. """ CONFIG = _GDPoptAlgorithm.CONFIG() _add_mip_solver_configs(CONFIG) _add_nlp_solver_configs(CONFIG, default_solver='ipopt') _add_nlp_solve_configs( CONFIG, default_nlp_init_method=restore_vars_to_original_values ) _add_tolerance_configs(CONFIG) _add_BB_configs(CONFIG) algorithm = 'LBB' # Override solve() to customize the docstring for this solver
[docs] @document_kwargs_from_configdict(CONFIG, doc=_GDPoptAlgorithm.solve.__doc__) def solve(self, model, **kwds): return super().solve(model, **kwds)
def _log_citation(self, config): config.logger.info( "\n" + """- LBB algorithm: Lee, S; Grossmann, IE. New algorithms for nonlinear generalized disjunctive programming. Comp. and Chem. Eng. 2000, 24, 2125-2141. DOI: 10.1016/S0098-1354(00)00581-0. """.strip() ) def _solve_gdp(self, model, config): self.explored_nodes = 0 # Create utility block on the original model so that we will be able to # copy solutions between util_block = self.original_util_block = add_util_block(model) add_disjunct_list(util_block) add_algebraic_variable_list(util_block) add_boolean_variable_lists(util_block) root_node = TransformationFactory( 'contrib.logical_to_disjunctive' ).create_using(model) root_util_blk = root_node.component(self.original_util_block.name) # Add to root utility block what we will need during the algorithm add_disjunction_list(root_util_blk) # Now that logical_to_disjunctive has been called. add_transformed_boolean_variable_list(root_util_blk) # Map unfixed disjunct -> list of deactivated constraints root_util_blk.disjunct_to_nonlinear_constraints = ComponentMap() # Map relaxed disjunctions -> list of unfixed disjuncts root_util_blk.disjunction_to_unfixed_disjuncts = ComponentMap() # Preprocess the active disjunctions for disjunction in root_util_blk.disjunction_list: assert disjunction.active disjuncts_fixed_True = [] disjuncts_fixed_False = [] unfixed_disjuncts = [] # categorize the disjuncts in the disjunction for disjunct in disjunction.disjuncts: if disjunct.indicator_var.fixed: if disjunct.indicator_var.value: disjuncts_fixed_True.append(disjunct) elif disjunct.indicator_var.value == False: disjuncts_fixed_False.append(disjunct) else: unfixed_disjuncts.append(disjunct) # update disjunct lists for predetermined disjunctions if len(disjuncts_fixed_False) == len(disjunction.disjuncts) - 1: # all but one disjunct in the disjunction is fixed to False. # Remaining one must be true. If not already fixed to True, do # so. if not disjuncts_fixed_True: disjuncts_fixed_True = unfixed_disjuncts[0] unfixed_disjuncts = [] disjuncts_fixed_True.indicator_var.fix(True) elif disjuncts_fixed_True and disjunction.xor: assert len(disjuncts_fixed_True) == 1, ( "XOR (only one True) violated: %s" % disjunction.name ) disjuncts_fixed_False.extend(unfixed_disjuncts) unfixed_disjuncts = [] # Make sure disjuncts fixed to False are properly deactivated. for disjunct in disjuncts_fixed_False: disjunct.deactivate() # Deactivate nonlinear constraints in unfixed disjuncts for disjunct in unfixed_disjuncts: nonlinear_constraints_in_disjunct = [ constr for constr in disjunct.component_data_objects( Constraint, active=True ) if constr.body.polynomial_degree() not in _linear_degrees ] for constraint in nonlinear_constraints_in_disjunct: constraint.deactivate() if nonlinear_constraints_in_disjunct: # TODO might be worthwhile to log number of nonlinear # constraints in each disjunction for later branching # purposes root_util_blk.disjunct_to_nonlinear_constraints[disjunct] = ( nonlinear_constraints_in_disjunct ) root_util_blk.disjunction_to_unfixed_disjuncts[disjunction] = ( unfixed_disjuncts ) pass # Add the BigM suffix if it does not already exist. Used later during # nonlinear constraint activation. if not hasattr(root_node, 'BigM'): root_node.BigM = Suffix() # Set up the priority queue queue = self.bb_queue = [] self.created_nodes = 0 unbranched_disjunction_indices = [ i for i, disjunction in enumerate(root_util_blk.disjunction_list) if disjunction in root_util_blk.disjunction_to_unfixed_disjuncts ] sort_tuple = BBNodeData( obj_lb=float('-inf'), obj_ub=float('inf'), is_screened=False, is_evaluated=False, num_unbranched_disjunctions=len(unbranched_disjunction_indices), node_count=0, unbranched_disjunction_indices=unbranched_disjunction_indices, ) heappush(queue, (sort_tuple, root_node)) # Do the branch and bound while len(queue) > 0: # visit the top node on the heap node_data, node_model = heappop(queue) config.logger.info( "Nodes: %s LB %.10g Unbranched %s" % ( self.explored_nodes, node_data.obj_lb, node_data.num_unbranched_disjunctions, ) ) # Check time limit if self.reached_time_limit(config): no_feasible_soln = float('inf') self.LB = ( node_data.obj_lb if self.objective_sense == minimize else -no_feasible_soln ) self.UB = ( no_feasible_soln if self.objective_sense == minimize else -node_data.obj_lb ) config.logger.info( 'Final bound values: LB: {} UB: {}'.format(self.LB, self.UB) ) return self._get_final_results_object() # Handle current node if not node_data.is_screened: # Node has not been evaluated. self.explored_nodes += 1 new_node_data = self._prescreen_node(node_data, node_model, config) # replace with updated node data heappush(queue, (new_node_data, node_model)) elif ( node_data.obj_lb < node_data.obj_ub - config.bound_tolerance and not node_data.is_evaluated ): # Node has not been fully evaluated. # Note: infeasible and unbounded nodes will skip this condition, # because of strict inequality new_node_data = self._evaluate_node(node_data, node_model, config) # replace with updated node data heappush(queue, (new_node_data, node_model)) elif ( node_data.num_unbranched_disjunctions == 0 or node_data.obj_lb == float('inf') ): # We have reached a leaf node, or the best available node is # infeasible. # Update the incumbent and put it in the original model self.update_incumbent( node_model.component(self.original_util_block.name) ) self._transfer_incumbent_to_original_model(config.logger) self.LB = ( node_data.obj_lb if self.objective_sense == minimize else -node_data.obj_ub ) self.UB = ( node_data.obj_ub if self.objective_sense == minimize else -node_data.obj_lb ) self.iteration = self.explored_nodes if node_data.obj_lb == float('inf'): self.pyomo_results.solver.termination_condition = tc.infeasible elif node_data.obj_ub == float('-inf'): self.pyomo_results.solver.termination_condition = tc.unbounded else: self.pyomo_results.solver.termination_condition = tc.optimal return self._get_final_pyomo_results_object() else: self._branch_on_node(node_data, node_model, config) def _branch_on_node(self, node_data, node_model, config): node_utils = node_model.component(self.original_util_block.name) # Keeping the naive branch selection disjunction_to_branch_idx = node_data.unbranched_disjunction_indices[0] disjunction_to_branch = node_utils.disjunction_list[disjunction_to_branch_idx] num_unfixed_disjuncts = len( node_utils.disjunction_to_unfixed_disjuncts[disjunction_to_branch] ) config.logger.info("Branching on disjunction %s" % disjunction_to_branch.name) node_count = self.created_nodes newly_created_nodes = 0 for disjunct_index_to_fix_True in range(num_unfixed_disjuncts): # Create a new branch for each unfixed disjunct child_model = node_model.clone() child_utils = child_model.component(node_utils.name) child_disjunction_to_branch = child_utils.disjunction_list[ disjunction_to_branch_idx ] child_unfixed_disjuncts = child_utils.disjunction_to_unfixed_disjuncts[ child_disjunction_to_branch ] for idx, child_disjunct in enumerate(child_unfixed_disjuncts): if idx == disjunct_index_to_fix_True: child_disjunct.indicator_var.fix(True) else: child_disjunct.deactivate() if not child_disjunction_to_branch.xor: raise NotImplementedError( "We still need to add support for non-XOR disjunctions." ) # This requires adding all combinations of activation status among # unfixed_disjuncts Reactivate nonlinear constraints in the # newly-fixed child disjunct fixed_True_disjunct = child_unfixed_disjuncts[disjunct_index_to_fix_True] for constr in child_utils.disjunct_to_nonlinear_constraints.get( fixed_True_disjunct, () ): constr.activate() child_model.BigM[constr] = 1 # set arbitrary BigM (ok, because # we fix corresponding Y=True) del child_utils.disjunction_to_unfixed_disjuncts[ child_disjunction_to_branch ] for child_disjunct in child_unfixed_disjuncts: child_utils.disjunct_to_nonlinear_constraints.pop(child_disjunct, None) newly_created_nodes += 1 child_node_data = node_data._replace( is_screened=False, is_evaluated=False, num_unbranched_disjunctions=node_data.num_unbranched_disjunctions - 1, node_count=node_count + newly_created_nodes, unbranched_disjunction_indices=node_data.unbranched_disjunction_indices[ 1: ], obj_ub=float('inf'), ) heappush(self.bb_queue, (child_node_data, child_model)) self.created_nodes += newly_created_nodes config.logger.info( "Added %s new nodes with %s relaxed disjunctions to " "the heap. Size now %s." % ( num_unfixed_disjuncts, node_data.num_unbranched_disjunctions - 1, len(self.bb_queue), ) ) def _prescreen_node(self, node_data, node_model, config): # Check node for satisfiability if sat-solver is enabled if config.check_sat and satisfiable(node_model, config.logger) is False: if node_data.node_count == 0: config.logger.info( "Root node is not satisfiable. Problem is infeasible." ) else: config.logger.info("SAT solver pruned node %s" % node_data.node_count) new_lb = new_ub = float('inf') else: # Solve model subproblem if config.solve_local_rnGDP: config.logger.debug( "Screening node %s with LB %.10g and %s inactive " "disjunctions." % ( node_data.node_count, node_data.obj_lb, node_data.num_unbranched_disjunctions, ) ) new_lb, new_ub = self._solve_local_rnGDP_subproblem(node_model, config) else: new_lb, new_ub = float('-inf'), float('inf') new_lb = max(node_data.obj_lb, new_lb) new_node_data = node_data._replace( obj_lb=new_lb, obj_ub=new_ub, is_screened=True ) return new_node_data def _evaluate_node(self, node_data, node_model, config): # Solve model subproblem config.logger.info( "Exploring node %s with LB %.10g UB %.10g and %s inactive " "disjunctions." % ( node_data.node_count, node_data.obj_lb, node_data.obj_ub, node_data.num_unbranched_disjunctions, ) ) new_lb, new_ub = self._solve_rnGDP_subproblem(node_model, config) new_node_data = node_data._replace( obj_lb=new_lb, obj_ub=new_ub, is_evaluated=True ) return new_node_data def _solve_rnGDP_subproblem(self, model, config): subproblem = TransformationFactory('gdp.bigm').create_using(model) obj_sense_correction = self.objective_sense != minimize model_utils = model.component(self.original_util_block.name) subprob_utils = subproblem.component(self.original_util_block.name) try: with SuppressInfeasibleWarning(): try: fbbt(subproblem, integer_tol=config.integer_tolerance) except InfeasibleConstraintException: # copy variable values, even if errored copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ignore_integrality=True, ) return float('inf'), float('inf') minlp_args = dict(config.minlp_solver_args) if config.time_limit is not None and config.minlp_solver == 'gams': elapsed = get_main_elapsed_time(self.timing) remaining = max(config.time_limit - elapsed, 1) minlp_args['add_options'] = minlp_args.get('add_options', []) minlp_args['add_options'].append('option reslim=%s;' % remaining) result = SolverFactory(config.minlp_solver).solve( subproblem, **minlp_args ) except RuntimeError as e: config.logger.warning( "Solver encountered RuntimeError. Treating as infeasible. " "Msg: %s\n%s" % (str(e), traceback.format_exc()) ) copy_var_list_values( # copy variable values, even if errored from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ignore_integrality=True, ) return float('inf'), float('inf') term_cond = result.solver.termination_condition if term_cond == tc.optimal: assert result.solver.status is SolverStatus.ok lb = ( result.problem.lower_bound if not obj_sense_correction else -result.problem.upper_bound ) ub = ( result.problem.upper_bound if not obj_sense_correction else -result.problem.lower_bound ) copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ) return lb, ub elif term_cond == tc.locallyOptimal or term_cond == tc.feasible: assert result.solver.status is SolverStatus.ok lb = ( result.problem.lower_bound if not obj_sense_correction else -result.problem.upper_bound ) ub = ( result.problem.upper_bound if not obj_sense_correction else -result.problem.lower_bound ) # TODO handle LB absent copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ) return lb, ub elif term_cond == tc.unbounded: copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ignore_integrality=True, ) return float('-inf'), float('-inf') elif term_cond == tc.infeasible: copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ignore_integrality=True, ) return float('inf'), float('inf') else: config.logger.warning( "Unknown termination condition of %s. " "Treating as infeasible." % term_cond ) copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ignore_integrality=True, ) return float('inf'), float('inf') def _solve_local_rnGDP_subproblem(self, model, config): # TODO: The returns of this method should be improved. Currently, it # returns trivial bounds (LB, UB) = (-inf, inf) if there is an error in # the solve, if the problem is infeasible, or if the problem is # unbounded. subproblem = TransformationFactory('gdp.bigm').create_using(model) obj_sense_correction = self.objective_sense != minimize subprob_utils = subproblem.component(self.original_util_block.name) model_utils = model.component(self.original_util_block.name) try: with SuppressInfeasibleWarning(): result = SolverFactory(config.local_minlp_solver).solve( subproblem, **config.local_minlp_solver_args ) except RuntimeError as e: config.logger.warning( "Solver encountered RuntimeError. Treating as infeasible. " "Msg: %s\n%s" % (str(e), traceback.format_exc()) ) copy_var_list_values( # copy variable values, even if errored from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ignore_integrality=True, ) return float('-inf'), float('inf') term_cond = result.solver.termination_condition if term_cond == tc.optimal: assert result.solver.status is SolverStatus.ok ub = ( result.problem.upper_bound if not obj_sense_correction else -result.problem.lower_bound ) copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ) return float('-inf'), ub elif term_cond == tc.locallyOptimal or term_cond == tc.feasible: assert result.solver.status is SolverStatus.ok ub = ( result.problem.upper_bound if not obj_sense_correction else -result.problem.lower_bound ) copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ) return float('-inf'), ub elif term_cond == tc.unbounded: copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ignore_integrality=True, ) return float('-inf'), float('-inf') elif term_cond == tc.infeasible: copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ignore_integrality=True, ) # [ESJ 7/11/22]: Making this float('inf'), float('inf') doesn't # break tests, and makes more sense. But I'm not sure why it's not # that way already? return float('-inf'), float('inf') else: config.logger.warning( "Unknown termination condition of %s. " "Treating as infeasible." % term_cond ) copy_var_list_values( from_list=subprob_utils.algebraic_variable_list, to_list=model_utils.algebraic_variable_list, config=config, ignore_integrality=True, ) return float('-inf'), float('inf')