# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
from pyomo.core.expr.visitor import StreamBasedExpressionVisitor
from pyomo.common.collections import ComponentMap, ComponentSet
from pyomo.common.numeric_types import native_numeric_types
from pyomo.core.expr.numeric_expr import (
NegationExpression,
PowExpression,
ProductExpression,
MonomialTermExpression,
DivisionExpression,
SumExpression,
LinearExpression,
UnaryFunctionExpression,
NPV_NegationExpression,
NPV_PowExpression,
NPV_ProductExpression,
NPV_DivisionExpression,
NPV_SumExpression,
NPV_UnaryFunctionExpression,
)
from pyomo.core.base.units_container import _PyomoUnit
from pyomo.repn.util import ExitNodeDispatcher
from pyomo.core.base import (
VarData,
ParamData,
ExpressionData,
VarList,
ConstraintList,
Block,
Constraint,
Objective,
)
from pyomo.core.base.var import ScalarVar
from pyomo.core.base.param import ScalarParam
from pyomo.contrib.fbbt import interval
from pyomo.contrib.fbbt.fbbt import (
compute_bounds_on_expr,
fbbt,
ExpressionBoundsVisitor,
)
from pyomo.core.base.expression import ScalarExpression
from pyomo.core.base.transformation import Transformation, TransformationFactory
from pyomo.common.modeling import unique_component_name
from pyomo.core.base.component import ActiveComponent
from pyomo.core.base.suffix import Suffix
from pyomo.common.config import ConfigDict, ConfigValue
from pyomo.repn.util import categorize_valid_components
def _handle_var(node, data, visitor):
if node.fixed:
return _handle_float(node.value, data, visitor)
visitor.node_to_var_map[node] = (node,)
visitor.degree_map[node] = 1
visitor.substitution_map[node] = node
return node
def _handle_unit(node, data, visitor):
return _handle_float(node.value, data, visitor)
def _handle_param(node, data, visitor):
return _handle_float(node.value, data, visitor)
def _handle_float(node, data, visitor):
visitor.node_to_var_map[node] = tuple()
visitor.degree_map[node] = 0
visitor.substitution_map[node] = node
return node
def _update_node_to_var_map(res, arg1_vars, arg2_vars, visitor):
arg1_nvars = len(arg1_vars)
arg2_nvars = len(arg2_vars)
if arg1_nvars == 0:
visitor.node_to_var_map[res] = arg2_vars
elif arg2_nvars == 0:
visitor.node_to_var_map[res] = arg1_vars
else:
x = arg1_vars[0]
y = arg2_vars[0]
if x is y:
visitor.node_to_var_map[res] = (x,)
else:
visitor.node_to_var_map[res] = (x, y)
def _handle_product(node, data, visitor):
arg1, arg2 = data
arg1_vars = visitor.node_to_var_map[arg1]
arg2_vars = visitor.node_to_var_map[arg2]
arg1_nvars = len(arg1_vars)
arg2_nvars = len(arg2_vars)
arg1_degree = visitor.degree_map[arg1]
arg2_degree = visitor.degree_map[arg2]
if arg1_degree == 0:
res = arg1 * arg2
visitor.node_to_var_map[res] = arg2_vars
visitor.degree_map[res] = arg2_degree
visitor.substitution_map[node] = res
return res
if arg2_degree == 0:
res = arg1 * arg2
visitor.node_to_var_map[res] = arg1_vars
visitor.degree_map[res] = arg1_degree
visitor.substitution_map[node] = res
return res
if arg1_nvars > 1 or visitor.aggressive_substitution:
arg1 = visitor.create_aux_var(arg1)
arg1_vars = (arg1,)
arg1_nvars = 1
arg1_degree = 1
if arg2_nvars > 1 or visitor.aggressive_substitution:
arg2 = visitor.create_aux_var(arg2)
arg2_vars = (arg2,)
arg2_nvars = 1
arg2_degree = 1
res = arg1 * arg2
# at this point arg1 should have exactly 1 variable
# and arg2 should have exactly 1 variable
_update_node_to_var_map(res, arg1_vars, arg2_vars, visitor)
visitor.degree_map[res] = -1
visitor.substitution_map[node] = res
return res
def _handle_sum(node, data, visitor):
arg_list = []
new_degree = 0
vset = ComponentSet()
for arg in data:
arg_vars = visitor.node_to_var_map[arg]
arg_degree = visitor.degree_map[arg]
if arg_degree == -1:
arg = visitor.create_aux_var(arg)
arg_vars = (arg,)
arg_degree = 1
arg_list.append(arg)
if arg_degree != 0:
new_degree = 1
vset.update(arg_vars)
res = sum(arg_list)
visitor.node_to_var_map[res] = tuple(vset)
visitor.degree_map[res] = new_degree
visitor.substitution_map[node] = res
return res
def _handle_division(node, data, visitor):
"""
This one is a bit tricky. If we encounter both x/z and y/z
at different places in the model, we only want one auxiliary
variable for 1/z.
"""
arg1, arg2 = data
arg1_vars = visitor.node_to_var_map[arg1]
arg2_vars = visitor.node_to_var_map[arg2]
arg1_nvars = len(arg1_vars)
arg2_nvars = len(arg2_vars)
arg1_degree = visitor.degree_map[arg1]
arg2_degree = visitor.degree_map[arg2]
if arg2_degree == 0:
res = arg1 / arg2
visitor.node_to_var_map[res] = arg1_vars
visitor.degree_map[res] = arg1_degree
visitor.substitution_map[node] = res
return res
if arg1_nvars > 1 or visitor.aggressive_substitution:
arg1 = visitor.create_aux_var(arg1)
arg1_vars = (arg1,)
arg1_nvars = 1
arg1_degree = 1
if arg2_nvars > 1 or visitor.aggressive_substitution:
arg2 = visitor.create_aux_var(arg2)
arg2_vars = (arg2,)
arg2_nvars = 1
arg2_degree = 1
if "div" not in visitor.substitution_map:
visitor.substitution_map["div"] = ComponentMap()
# now we need to figure out if we have seen 1/arg2 before
if arg2 in visitor.substitution_map["div"]:
aux = visitor.substitution_map["div"][arg2]
else:
aux = visitor.block.x.add()
visitor.substitution_map["div"][arg2] = aux
"""
we can only create a piecewise linear function of 1/arg2 if arg2 is either
strictly greater than 0 or strictly less than 0
otherwise, we do
aux = 1 / arg2
aux * arg2 = 1
"""
arg2_lb, arg2_ub = compute_bounds_on_expr(arg2)
if (arg2_lb is not None and arg2_lb > 0) or (
arg2_ub is not None and arg2_ub < 0
):
c = visitor.block.c.add(aux == 1 / arg2) # keep it univariate if we can
else:
c = visitor.block.c.add(aux * arg2 == 1)
fbbt(c)
arg2 = aux
arg2_vars = (arg2,)
arg2_nvars = 1
arg2_degree = 1
res = arg1 * arg2
# at this point arg1 should have at most 1 variable
# and arg2 should have exactly 1 variable
_update_node_to_var_map(res, arg1_vars, arg2_vars, visitor)
if arg1_degree == 0:
visitor.degree_map[res] = arg2_degree
else:
visitor.degree_map[res] = -1
visitor.substitution_map[node] = res
return res
def _handle_pow(node, data, visitor):
# arg1 ** arg2
# exp(arg2 * log(arg1))
arg1, arg2 = data
arg1_vars = visitor.node_to_var_map[arg1]
arg2_vars = visitor.node_to_var_map[arg2]
arg1_nvars = len(arg1_vars)
arg2_nvars = len(arg2_vars)
arg1_degree = visitor.degree_map[arg1]
arg2_degree = visitor.degree_map[arg2]
if arg1_nvars > 1 or visitor.aggressive_substitution:
arg1 = visitor.create_aux_var(arg1)
arg1_vars = (arg1,)
arg1_nvars = 1
arg1_degree = 1
if arg2_nvars > 1 or visitor.aggressive_substitution:
arg2 = visitor.create_aux_var(arg2)
arg2_vars = (arg2,)
arg2_nvars = 1
arg2_degree = 1
res = arg1**arg2
# at this point arg1 should have at most 1 variable
# and arg2 should have at most 1 variable
_update_node_to_var_map(res, arg1_vars, arg2_vars, visitor)
if arg1_degree == 0 and arg2_degree == 0:
visitor.degree_map[res] = 0
else:
visitor.degree_map[res] = -1
visitor.substitution_map[node] = res
return res
def _handle_named_expression(node, data, visitor):
assert len(data) == 1
node.expr = data[0]
visitor.substitution_map[node] = node
return node
def _handle_negation(node, data, visitor):
arg = data[0]
res = -arg
visitor.node_to_var_map[res] = visitor.node_to_var_map[arg]
visitor.degree_map[res] = visitor.degree_map[arg]
visitor.substitution_map[node] = res
return res
def _handle_unary(node, data, visitor):
arg = data[0]
arg_vars = visitor.node_to_var_map[arg]
arg_nvars = len(arg_vars)
arg_degree = visitor.degree_map[arg]
if arg_nvars > 1 or visitor.aggressive_substitution:
arg = visitor.create_aux_var(arg)
arg_vars = (arg,)
arg_nvars = 1
arg_degree = 1
res = node.create_node_with_local_data((arg,))
visitor.node_to_var_map[res] = arg_vars
if arg_degree == 0:
visitor.degree_map[res] = 0
else:
visitor.degree_map[res] = -1
visitor.substitution_map[node] = res
return res
handlers = ExitNodeDispatcher()
handlers[VarData] = _handle_var
handlers[ScalarVar] = _handle_var
handlers[ParamData] = _handle_param
handlers[ScalarParam] = _handle_param
handlers[ProductExpression] = _handle_product
handlers[SumExpression] = _handle_sum
handlers[DivisionExpression] = _handle_division
handlers[PowExpression] = _handle_pow
handlers[MonomialTermExpression] = _handle_product
handlers[LinearExpression] = _handle_sum
handlers[ExpressionData] = _handle_named_expression
handlers[ScalarExpression] = _handle_named_expression
handlers[NegationExpression] = _handle_negation
handlers[UnaryFunctionExpression] = _handle_unary
handlers[int] = _handle_float
handlers[float] = _handle_float
handlers[NPV_NegationExpression] = _handle_negation
handlers[NPV_PowExpression] = _handle_pow
handlers[NPV_ProductExpression] = _handle_product
handlers[NPV_DivisionExpression] = _handle_division
handlers[NPV_SumExpression] = _handle_sum
handlers[NPV_UnaryFunctionExpression] = _handle_unary
handlers[_PyomoUnit] = _handle_unit
class _UnivariateNonlinearDecompositionVisitor(StreamBasedExpressionVisitor):
def __init__(self, **kwds):
self.block = kwds.pop('aux_block')
self.aggressive_substitution = kwds.pop('aggressive_substitution')
super().__init__(**kwds)
self.node_to_var_map = ComponentMap()
self.degree_map = (
ComponentMap()
) # values will be 0 (constant), 1 (linear), or -1 (nonlinear)
self.substitution_map = ComponentMap()
self.block.x = VarList()
self.block.c = ConstraintList()
self._leaf_types = {VarData, ScalarVar, ParamData, ScalarVar, float, int}
self._interval_visitor = ExpressionBoundsVisitor(
use_fixed_var_values_as_bounds=True
)
def initializeWalker(self, expr):
if expr in self.substitution_map:
return False, self.substitution_map[expr]
return True, None
def beforeChild(self, node, child, child_idx):
if child in self.substitution_map:
return False, self.substitution_map[child]
return True, None
def exitNode(self, node, data):
nt = type(node)
if nt in handlers:
return handlers[type(node)](node, data, self)
elif nt in native_numeric_types:
handlers[nt] = _handle_float
return _handle_float(node, data, self)
else:
raise NotImplementedError(f'unrecognized expression type: {nt}')
def _is_leaf(self, x):
if type(x) in self._leaf_types or type(x) in native_numeric_types:
return True
return False
def create_aux_var(self, expr):
if expr in self.substitution_map:
x = self.substitution_map[expr]
elif self._is_leaf(expr):
self.substitution_map[expr] = expr
return expr
else:
x = self.block.x.add()
self.substitution_map[expr] = x
c = self.block.c.add(x == expr)
# we need to compute bounds on x now because some of the
# handlers depend on variable bounds (e.g., division)
xl, xu = self._interval_visitor.walk_expression(expr)
if xl == -interval.inf:
xl = None
if xu == interval.inf:
xu = None
x.setlb(xl)
x.setub(xu)
return x