# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2025
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
# Note: the self.mcpp.* functions are all C-style functions implemented
# in the compiled MC++ wrapper library
# Note: argument to pow must be an integer
import ctypes
import logging
import os
from pyomo.common.fileutils import Library
from pyomo.core import value, Expression
from pyomo.core.base.block import SubclassOf
from pyomo.core.base.expression import NamedExpressionData
from pyomo.core.expr.numvalue import nonpyomo_leaf_types
from pyomo.core.expr.numeric_expr import (
AbsExpression,
LinearExpression,
NegationExpression,
NPV_AbsExpression,
NPV_ExternalFunctionExpression,
NPV_NegationExpression,
NPV_PowExpression,
NPV_ProductExpression,
NPV_SumExpression,
NPV_UnaryFunctionExpression,
PowExpression,
ProductExpression,
SumExpression,
UnaryFunctionExpression,
NPV_DivisionExpression,
DivisionExpression,
)
from pyomo.core.expr.visitor import StreamBasedExpressionVisitor, identify_variables
from pyomo.common.collections import ComponentMap
logger = logging.getLogger('pyomo.contrib.mcpp')
path = os.path.dirname(__file__)
__version__ = "19.11.12"
[docs]
def mcpp_available():
"""True if the MC++ shared object file exists. False otherwise."""
return Library('mcppInterface').path() is not None
NPV_expressions = (
NPV_AbsExpression,
NPV_ExternalFunctionExpression,
NPV_NegationExpression,
NPV_PowExpression,
NPV_ProductExpression,
NPV_SumExpression,
NPV_UnaryFunctionExpression,
NPV_DivisionExpression,
)
def _MCPP_lib():
"""A singleton interface to the MC++ library"""
if _MCPP_lib._mcpp is not None:
return _MCPP_lib._mcpp
_MCPP_lib._mcpp = mcpp = ctypes.CDLL(Library('mcppInterface').path())
# Version number
mcpp.get_version.restype = ctypes.c_char_p
mcpp.toString.argtypes = [ctypes.c_void_p]
mcpp.toString.restype = ctypes.c_char_p
mcpp.lower.argtypes = [ctypes.c_void_p]
mcpp.lower.restype = ctypes.c_double
mcpp.upper.argtypes = [ctypes.c_void_p]
mcpp.upper.restype = ctypes.c_double
mcpp.concave.argtypes = [ctypes.c_void_p]
mcpp.concave.restype = ctypes.c_double
mcpp.convex.argtypes = [ctypes.c_void_p]
mcpp.convex.restype = ctypes.c_double
mcpp.subcc.argtypes = [ctypes.c_void_p, ctypes.c_int]
mcpp.subcc.restype = ctypes.c_double
mcpp.subcv.argtypes = [ctypes.c_void_p, ctypes.c_int]
mcpp.subcv.restype = ctypes.c_double
# Create MC type variable
mcpp.newVar.argtypes = [
ctypes.c_double,
ctypes.c_double,
ctypes.c_double,
ctypes.c_int,
ctypes.c_int,
]
mcpp.newVar.restype = ctypes.c_void_p
# Create MC type constant
mcpp.newConstant.argtypes = [ctypes.c_double]
mcpp.newConstant.restype = ctypes.c_void_p
# Multiply MC objects
mcpp.multiply.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
mcpp.multiply.restype = ctypes.c_void_p
# Add MC objects
mcpp.add.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
mcpp.add.restype = ctypes.c_void_p
# pow(x, y) functions
# y is integer
mcpp.power.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
mcpp.power.restype = ctypes.c_void_p
# y is fractional
mcpp.powerf.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
mcpp.powerf.restype = ctypes.c_void_p
# y is an expression
mcpp.powerx.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
mcpp.powerx.restype = ctypes.c_void_p
# sqrt function
mcpp.mc_sqrt.argtypes = [ctypes.c_void_p]
mcpp.mc_sqrt.restype = ctypes.c_void_p
# - MC Variable
mcpp.negation.argtypes = [ctypes.c_void_p]
mcpp.negation.restype = ctypes.c_void_p
# fabs(MC Variable)
mcpp.mc_abs.argtypes = [ctypes.c_void_p]
mcpp.mc_abs.restype = ctypes.c_void_p
# sin(MC Variable)
mcpp.trigSin.argtypes = [ctypes.c_void_p]
mcpp.trigSin.restype = ctypes.c_void_p
# cos(MC Variable)
mcpp.trigCos.argtypes = [ctypes.c_void_p]
mcpp.trigCos.restype = ctypes.c_void_p
# tan(MC Variable)
mcpp.trigTan.argtypes = [ctypes.c_void_p]
mcpp.trigTan.restype = ctypes.c_void_p
# asin(MC Variable)
mcpp.atrigSin.argtypes = [ctypes.c_void_p]
mcpp.atrigSin.restype = ctypes.c_void_p
# acos(MC Variable)
mcpp.atrigCos.argtypes = [ctypes.c_void_p]
mcpp.atrigCos.restype = ctypes.c_void_p
# atan(MC Variable)
mcpp.atrigTan.argtypes = [ctypes.c_void_p]
mcpp.atrigTan.restype = ctypes.c_void_p
# exp(MC Variable)
mcpp.exponential.argtypes = [ctypes.c_void_p]
mcpp.exponential.restype = ctypes.c_void_p
# log(MC Variable)
mcpp.logarithm.argtypes = [ctypes.c_void_p]
mcpp.logarithm.restype = ctypes.c_void_p
# Releases object from memory (prevent memory leaks)
mcpp.release.argtypes = [ctypes.c_void_p]
# Unary function exception wrapper
mcpp.try_unary_fcn.argtypes = [ctypes.c_void_p, ctypes.c_void_p]
mcpp.try_unary_fcn.restype = ctypes.c_void_p
# Binary function exception wrapper
mcpp.try_binary_fcn.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
mcpp.try_binary_fcn.restype = ctypes.c_void_p
# Error message retrieval
mcpp.get_last_exception_message.restype = ctypes.c_char_p
return mcpp
# Initialize the singleton to None
_MCPP_lib._mcpp = None
[docs]
class MCPP_Error(Exception):
pass
[docs]
class MCPP_visitor(StreamBasedExpressionVisitor):
"""Creates an MC++ expression from the corresponding Pyomo expression.
This class walks a pyomo expression tree and builds up the corresponding
expression of type McCormick.
Note on memory management: The MCPP_visitor will return a pointer to
an MC++ interval object that was dynamically allocated within the C
interface. It is the caller's responsibility to call
`mcpp_lib.release()` on that object to prevent a memory leak
"""
[docs]
def __init__(self, expression, improved_var_bounds=None):
super(MCPP_visitor, self).__init__()
self.mcpp = _MCPP_lib()
so_file_version = self.mcpp.get_version()
so_file_version = so_file_version.decode("utf-8")
if not so_file_version == __version__:
raise MCPP_Error(
"Shared object file version %s is out of date with MC++ interface version %s. "
"Please rebuild the library." % (so_file_version, __version__)
)
self.missing_value_warnings = []
self.expr = expression
vars = list(identify_variables(expression, include_fixed=False))
self.num_vars = len(vars)
# Map expression variables to MC variables
self.known_vars = ComponentMap()
# Map expression variables to their index
self.var_to_idx = ComponentMap()
# Pre-register all variables
inf = float('inf')
for i, var in enumerate(vars):
self.var_to_idx[var] = i
# check if improved variable bound is provided
if improved_var_bounds is not None:
lb, ub = improved_var_bounds.get(var, (-inf, inf))
else:
lb, ub = -inf, inf
self.known_vars[var] = self.register_var(var, lb, ub)
self.refs = None
[docs]
def walk_expression(self):
self.refs = set()
ans = super(MCPP_visitor, self).walk_expression(self.expr)
self.refs = None
return ans
def exitNode(self, node, data):
if isinstance(node, ProductExpression):
ans = self.mcpp.multiply(data[0], data[1])
elif isinstance(node, SumExpression):
ans = data[0]
for arg in data[1:]:
ans = self.mcpp.add(ans, arg)
elif isinstance(node, PowExpression):
if type(node.arg(1)) == int:
ans = self.mcpp.try_binary_fcn(self.mcpp.power, data[0], data[1])
elif type(node.arg(1)) == float:
ans = self.mcpp.try_binary_fcn(self.mcpp.powerf, data[0], data[1])
else:
ans = self.mcpp.try_binary_fcn(self.mcpp.powerx, data[0], data[1])
elif isinstance(node, DivisionExpression):
ans = self.mcpp.try_binary_fcn(self.mcpp.divide, data[0], data[1])
elif isinstance(node, NegationExpression):
ans = self.mcpp.negation(data[0])
elif isinstance(node, AbsExpression):
ans = self.mcpp.try_unary_fcn(self.mcpp.mc_abs, data[0])
elif isinstance(node, LinearExpression):
raise NotImplementedError(
'Quicksum has bugs that prevent proper usage of MC++.'
)
# ans = self.mcpp.newConstant(node.constant)
# for coef, var in zip(node.linear_coefs, node.linear_vars):
# ans = self.mcpp.add(
# ans,
# self.mcpp.multiply(
# self.mcpp.newConstant(coef),
# self.register_num(var)))
elif isinstance(node, UnaryFunctionExpression):
if node.name == "exp":
ans = self.mcpp.try_unary_fcn(self.mcpp.exponential, data[0])
elif node.name == "log":
ans = self.mcpp.try_unary_fcn(self.mcpp.logarithm, data[0])
elif node.name == "sin":
ans = self.mcpp.try_unary_fcn(self.mcpp.trigSin, data[0])
elif node.name == "cos":
ans = self.mcpp.try_unary_fcn(self.mcpp.trigCos, data[0])
elif node.name == "tan":
ans = self.mcpp.try_unary_fcn(self.mcpp.trigTan, data[0])
elif node.name == "asin":
ans = self.mcpp.try_unary_fcn(self.mcpp.atrigSin, data[0])
elif node.name == "acos":
ans = self.mcpp.try_unary_fcn(self.mcpp.atrigCos, data[0])
elif node.name == "atan":
ans = self.mcpp.try_unary_fcn(self.mcpp.atrigTan, data[0])
elif node.name == "sqrt":
ans = self.mcpp.try_unary_fcn(self.mcpp.mc_sqrt, data[0])
else:
raise NotImplementedError("Unknown unary function: %s" % (node.name,))
elif isinstance(node, NPV_expressions):
ans = self.mcpp.newConstant(value(data[0]))
elif type(node) in nonpyomo_leaf_types:
ans = self.mcpp.newConstant(node)
elif not node.is_expression_type():
ans = self.register_num(node)
elif type(node) in SubclassOf(Expression) or isinstance(
node, NamedExpressionData
):
ans = data[0]
else:
raise RuntimeError("Unhandled expression type: %s" % (type(node)))
if ans is None:
msg = self.mcpp.get_last_exception_message()
msg = msg.decode("utf-8")
raise MCPP_Error(msg)
return ans
def beforeChild(self, node, child, child_idx):
if type(child) in nonpyomo_leaf_types:
# This means the child is POD
# i.e., int, float, string
return False, self.mcpp.newConstant(child)
elif not child.is_expression_type():
# node is either a Param, Var, or NumericConstant
return False, self.register_num(child)
else:
# this is an expression node
return True, None
def acceptChildResult(self, node, data, child_result, child_idx):
self.refs.add(child_result)
data.append(child_result)
return data
[docs]
def register_num(self, num):
"""Registers a new number: Param, Var, or NumericConstant."""
if num.is_fixed():
return self.mcpp.newConstant(value(num))
else:
return self.known_vars[num]
[docs]
def register_var(self, var, lb, ub):
"""Registers a new variable."""
var_idx = self.var_to_idx[var]
inf = float('inf')
# Guard against errant None values in lb and ub
lb = -inf if lb is None else lb
ub = inf if ub is None else ub
lb = max(var.lb if var.has_lb() else -inf, lb)
ub = min(var.ub if var.has_ub() else inf, ub)
var_val = value(var, exception=False)
if lb == -inf:
lb = -500000
logger.warning(
'Var %s missing lower bound. Assuming LB of %s' % (var.name, lb)
)
if ub == inf:
ub = 500000
logger.warning(
'Var %s missing upper bound. Assuming UB of %s' % (var.name, ub)
)
if var_val is None:
var_val = (lb + ub) / 2
self.missing_value_warnings.append(
'Var %s missing value. Assuming midpoint value of %s'
% (var.name, var_val)
)
return self.mcpp.newVar(lb, var_val, ub, self.num_vars, var_idx)
def finalizeResult(self, node_result):
# Note, the node_result should NOT be in self.refs
# self.refs.remove(node_result)
assert node_result not in self.refs
for r in self.refs:
self.mcpp.release(r)
return node_result
[docs]
class McCormick(object):
"""
This class takes the constructed expression from MCPP_Visitor and
allows for MC methods to be performed on pyomo expressions.
__repn__(self): returns a display of an MC expression in the form:
F: [lower interval : upper interval ] [convex underestimator :
concave overestimator ] [ (convex subgradient) : (concave subgradient]
lower(self): returns a float of the lower interval bound that is valid
across the entire domain
upper(self): returns a float of the upper interval bound that is valid
across the entire domain
concave(self): returns a float of the concave overestimator at the
current value() of each variable.
convex(self): returns a float of the convex underestimator at the
current value() of each variable.
##Note: In order to describe the concave and convex relaxations over
the entire domain, it is necessary to use changePoint() to repeat the
calculation at different points.
subcc(self): returns a ComponentMap() that maps the pyomo variables
to the subgradients of the McCormick concave overestimators at the
current value() of each variable.
subcv(self): returns a ComponentMap() that maps the pyomo variables
to the subgradients of the McCormick convex underestimators at the
current value() of each variable.
def changePoint(self, var, point): updates the current value() on the
pyomo side and the current point on the MC++ side.
"""
[docs]
def __init__(self, expression, improved_var_bounds=None):
# Guarantee that McCormick objects have mc_expr defined
self.mc_expr = None
self.mcpp = _MCPP_lib()
self.pyomo_expr = expression
self.visitor = MCPP_visitor(expression, improved_var_bounds)
self.mc_expr = self.visitor.walk_expression()
def __del__(self):
if self.mc_expr is not None:
self.mcpp.release(self.mc_expr)
self.mc_expr = None
def __repn__(self):
repn = self.mcpp.toString(self.mc_expr)
repn = repn.decode("utf-8")
return repn
def __str__(self):
return self.__repn__()
def lower(self):
return self.mcpp.lower(self.mc_expr)
def upper(self):
return self.mcpp.upper(self.mc_expr)
def concave(self):
self.warn_if_var_missing_value()
return self.mcpp.concave(self.mc_expr)
def convex(self):
self.warn_if_var_missing_value()
return self.mcpp.convex(self.mc_expr)
def subcc(self):
self.warn_if_var_missing_value()
ans = ComponentMap()
for key in self.visitor.var_to_idx:
i = self.visitor.var_to_idx[key]
ans[key] = self.mcpp.subcc(self.mc_expr, i)
return ans
def subcv(self):
self.warn_if_var_missing_value()
ans = ComponentMap()
for key in self.visitor.var_to_idx:
i = self.visitor.var_to_idx[key]
ans[key] = self.mcpp.subcv(self.mc_expr, i)
return ans
def changePoint(self, var, point):
var.set_value(point)
# WARNING: TODO: this has side effects. If we do not use a
# fresh MCPP_visitor, we get segfaults and different results.
self.visitor = MCPP_visitor(self.visitor.expr)
self.mcpp.release(self.mc_expr)
self.mc_expr = self.visitor.walk_expression()
def warn_if_var_missing_value(self):
if self.visitor.missing_value_warnings:
for message in self.visitor.missing_value_warnings:
logger.warning(message)
self.visitor.missing_value_warnings = []