# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
import ctypes
import math
import operator
from collections import deque
from operator import itemgetter
from pyomo.common.deprecation import deprecation_warning
from pyomo.common.errors import DeveloperError, InfeasibleConstraintException, MouseTrap
from pyomo.common.numeric_types import (
native_complex_types,
native_numeric_types,
native_types,
value,
)
from pyomo.core.base import Expression
from pyomo.core.expr import (
NegationExpression,
ProductExpression,
DivisionExpression,
PowExpression,
AbsExpression,
UnaryFunctionExpression,
MonomialTermExpression,
LinearExpression,
SumExpression,
EqualityExpression,
InequalityExpression,
RangedExpression,
Expr_ifExpression,
ExternalFunctionExpression,
)
from pyomo.core.expr.visitor import StreamBasedExpressionVisitor, _EvaluationVisitor
from pyomo.repn.util import (
BeforeChildDispatcher,
ExitNodeDispatcher,
ExprType,
InvalidNumber,
apply_node_operation,
complex_number_error,
nan,
sum_like_expression_types,
)
_CONSTANT = ExprType.CONSTANT
_MONOMIAL = ExprType.MONOMIAL
_GENERAL = ExprType.GENERAL
# Feasibility tolerance for trivial (fixed) constraints
TOL = 1e-8
def _create_strict_inequality_map(vars_):
vars_['strict_inequality_map'] = {
True: vars_['less_than'],
False: vars_['less_equal'],
(True, True): (vars_['less_than'], vars_['less_than']),
(True, False): (vars_['less_than'], vars_['less_equal']),
(False, True): (vars_['less_equal'], vars_['less_than']),
(False, False): (vars_['less_equal'], vars_['less_equal']),
}
[docs]
class TextNLDebugTemplate(object):
unary = {
'log': 'o43\t#log\n',
'log10': 'o42\t#log10\n',
'sin': 'o41\t#sin\n',
'cos': 'o46\t#cos\n',
'tan': 'o38\t#tan\n',
'sinh': 'o40\t#sinh\n',
'cosh': 'o45\t#cosh\n',
'tanh': 'o37\t#tanh\n',
'asin': 'o51\t#asin\n',
'acos': 'o53\t#acos\n',
'atan': 'o49\t#atan\n',
'exp': 'o44\t#exp\n',
'sqrt': 'o39\t#sqrt\n',
'asinh': 'o50\t#asinh\n',
'acosh': 'o52\t#acosh\n',
'atanh': 'o47\t#atanh\n',
'ceil': 'o14\t#ceil\n',
'floor': 'o13\t#floor\n',
}
binary_sum = 'o0\t#+\n'
product = 'o2\t#*\n'
division = 'o3\t# /\n'
pow = 'o5\t#^\n'
abs = 'o15\t# abs\n'
negation = 'o16\t#-\n'
nary_sum = 'o54\t# sumlist\n%d\t# (n)\n'
exprif = 'o35\t# if\n'
and_expr = 'o21\t# and\n'
less_than = 'o22\t# lt\n'
less_equal = 'o23\t# le\n'
equality = 'o24\t# eq\n'
external_fcn = 'f%d %d%s\n'
# NOTE: to support scaling and substitutions, we do NOT include the
# 'v' or the EOL here:
var = '%s'
const = 'n%s\n'
string = 'h%d:%s\n'
monomial = product + const + var.replace('%', '%%')
multiplier = product + const
_create_strict_inequality_map(vars())
nl_operators = {
0: (2, operator.add),
2: (2, operator.mul),
3: (2, operator.truediv),
5: (2, operator.pow),
15: (1, operator.abs),
16: (1, operator.neg),
54: (None, lambda *x: sum(x)),
35: (3, lambda a, b, c: b if a else c),
21: (2, operator.and_),
22: (2, operator.lt),
23: (2, operator.le),
24: (2, operator.eq),
43: (1, math.log),
42: (1, math.log10),
41: (1, math.sin),
46: (1, math.cos),
38: (1, math.tan),
40: (1, math.sinh),
45: (1, math.cosh),
37: (1, math.tanh),
51: (1, math.asin),
53: (1, math.acos),
49: (1, math.atan),
44: (1, math.exp),
39: (1, math.sqrt),
50: (1, math.asinh),
52: (1, math.acosh),
47: (1, math.atanh),
14: (1, math.ceil),
13: (1, math.floor),
}
def _strip_template_comments(vars_, base_):
vars_['unary'] = {
k: v[: v.find('\t#')] + '\n' if v[-1] == '\n' else ''
for k, v in base_.unary.items()
}
for k, v in base_.__dict__.items():
if type(v) is str and '\t#' in v:
v_lines = v.split('\n')
for i, l in enumerate(v_lines):
comment_start = l.find('\t#')
if comment_start >= 0:
v_lines[i] = l[:comment_start]
vars_[k] = '\n'.join(v_lines)
def _inv2str(val):
return f"{val._str() if hasattr(val, '_str') else val}"
# The "standard" text mode template is the debugging template with the
# comments removed
[docs]
class TextNLTemplate(TextNLDebugTemplate):
_strip_template_comments(vars(), TextNLDebugTemplate)
_create_strict_inequality_map(vars())
[docs]
class NLFragment(object):
"""This is a mock "component" for the nl portion of a named Expression.
It is used internally in the writer when requesting symbolic solver
labels so that we can generate meaningful names for the nonlinear
portion of an Expression component.
"""
__slots__ = ('_repn', '_node')
[docs]
def __init__(self, repn, node):
self._repn = repn
self._node = node
@property
def name(self):
return 'nl(' + self._node.name + ')'
[docs]
class AMPLRepn(object):
"""The "compiled" representation of an expression in AMPL NL format.
This stores a compiled form of an expression in the AMPL "NL"
format. The data structure contains 6 fields:
Attributes
----------
mult : float
A constant multiplier applied to this expression. The
:py:class`AMPLRepn` returned by the :py:class`AMPLRepnVisitor`
should always have `mult` == 1.
const : float
The constant portion of this expression
linear : Dict[int, float] or None
Mapping of `id(VarData)` to linear coefficient
nonlinear : Tuple[str, List[int]] or List[Tuple[str, List[int]]] or None
The general nonlinear portion of the compiled expression as a
tuple of two parts:
- the nl template string: this is the NL string with
placeholders (``%s``) for all the variables that appear in
the expression.
- an iterable if the :class:`VarData` IDs that correspond to the
placeholders in the nl template string
This is `None` if there is no general nonlinear part of the
expression. Note that this can be a list of tuple fragments
within AMPLRepnVisitor, but that list is concatenated to a
single tuple when exiting the `AMPLRepnVisitor`.
named_exprs : Set[int]
A set of IDs point to named expressions (:py:class:`Expression`)
objects appearing in this expression.
nl : Tuple[str, Iterable[int]]
This holds the complete compiled representation of this
expression (including multiplier, constant, linear terms, and
nonlinear fragment) using the same format as the `nonlinear`
attribute. This field (if not None) should be considered
authoritative, as there are NL fragments that are not
representable by {mult, const, linear, nonlinear} (e.g., string
arguments).
"""
__slots__ = ('nl', 'mult', 'const', 'linear', 'nonlinear', 'named_exprs')
template = TextNLTemplate
[docs]
def __init__(self, const, linear, nonlinear):
self.nl = None
self.mult = 1
self.const = const
self.linear = linear
if nonlinear is None:
self.nonlinear = self.named_exprs = None
else:
nl, nl_args, self.named_exprs = nonlinear
self.nonlinear = nl, nl_args
def __str__(self):
return (
f'AMPLRepn(mult={self.mult}, const={self.const}, '
f'linear={self.linear}, nonlinear={self.nonlinear}, '
f'nl={self.nl}, named_exprs={self.named_exprs})'
)
def __repr__(self):
return str(self)
def __eq__(self, other):
return isinstance(other.__class__, AMPLRepn) and (
self.nl == other.nl
and self.mult == other.mult
and self.const == other.const
and self.linear == other.linear
and self.nonlinear == other.nonlinear
and self.named_exprs == other.named_exprs
)
def __hash__(self):
# Approximation of the Python default object hash
# (4 LSB are rolled to the MSB to reduce hash collisions)
return id(self) // 16 + (
(id(self) & 15) << 8 * ctypes.sizeof(ctypes.c_void_p) - 4
)
def duplicate(self):
ans = self.__class__.__new__(self.__class__)
ans.nl = self.nl
ans.mult = self.mult
ans.const = self.const
ans.linear = None if self.linear is None else dict(self.linear)
ans.nonlinear = self.nonlinear
ans.named_exprs = None if self.named_exprs is None else set(self.named_exprs)
return ans
def compile_repn(self, prefix='', args=None, named_exprs=None):
template = self.template
if self.mult != 1:
if self.mult == -1:
prefix += template.negation
else:
prefix += template.multiplier % self.mult
self.mult = 1
if self.named_exprs is not None:
if named_exprs is None:
named_exprs = set(self.named_exprs)
else:
named_exprs.update(self.named_exprs)
if self.nl is not None:
# This handles both named subexpressions and embedded
# non-numeric (e.g., string) arguments.
nl, nl_args = self.nl
if prefix:
nl = prefix + nl
if args is not None:
assert args is not nl_args
args.extend(nl_args)
else:
args = list(nl_args)
if nl_args:
# For string arguments, nl_args is an empty tuple and
# self.named_exprs is None. For named subexpressions,
# we are guaranteed that named_exprs is NOT None. We
# need to ensure that the named subexpression that we
# are returning is added to the named_exprs set.
named_exprs.update(nl_args)
return nl, args, named_exprs
if args is None:
args = []
if self.linear:
nterms = -len(args)
_v_template = template.var
_m_template = template.monomial
# Because we are compiling this expression (into a NL
# expression), we will go ahead and filter the 0*x terms
# from the expression. Note that the args are accumulated
# by side-effect, which prevents iterating over the linear
# terms twice.
nl_sum = ''.join(
args.append(v) or (_v_template if c == 1 else _m_template % c)
for v, c in self.linear.items()
if c
)
nterms += len(args)
else:
nterms = 0
nl_sum = ''
if self.nonlinear:
if self.nonlinear.__class__ is list:
nterms += len(self.nonlinear)
nl_sum += ''.join(map(itemgetter(0), self.nonlinear))
deque(map(args.extend, map(itemgetter(1), self.nonlinear)), maxlen=0)
else:
nterms += 1
nl_sum += self.nonlinear[0]
args.extend(self.nonlinear[1])
if self.const:
nterms += 1
nl_sum += template.const % self.const
if nterms > 2:
return (prefix + (template.nary_sum % nterms) + nl_sum, args, named_exprs)
elif nterms == 2:
return prefix + template.binary_sum + nl_sum, args, named_exprs
elif nterms == 1:
return prefix + nl_sum, args, named_exprs
else: # nterms == 0
return prefix + (template.const % 0), args, named_exprs
def compile_nonlinear_fragment(self):
if not self.nonlinear:
self.nonlinear = None
return
args = []
nterms = len(self.nonlinear)
nl_sum = ''.join(map(itemgetter(0), self.nonlinear))
deque(map(args.extend, map(itemgetter(1), self.nonlinear)), maxlen=0)
if nterms > 2:
self.nonlinear = (self.template.nary_sum % nterms) + nl_sum, args
elif nterms == 2:
self.nonlinear = self.template.binary_sum + nl_sum, args
else: # nterms == 1:
self.nonlinear = nl_sum, args
[docs]
def append(self, other):
"""Append a child result from acceptChildResult
Notes
-----
This method assumes that the operator was "+". It is implemented
so that we can directly use an AMPLRepn() as a data object in
the expression walker (thereby avoiding the function call for a
custom callback)
"""
# Note that self.mult will always be 1 (we only call append()
# within a sum, so there is no opportunity for self.mult to
# change). Omitting the assertion for efficiency.
# assert self.mult == 1
_type = other[0]
if _type is _MONOMIAL:
_, v, c = other
if v in self.linear:
self.linear[v] += c
else:
self.linear[v] = c
elif _type is _GENERAL:
_, other = other
if other.nl is not None and other.nl[1]:
if other.linear:
# This is a named expression with both a linear and
# nonlinear component. We want to merge it with
# this AMPLRepn, preserving the named expression for
# only the nonlinear component (merging the linear
# component with this AMPLRepn).
pass
else:
# This is a nonlinear-only named expression,
# possibly with a multiplier that is not 1. Compile
# it and append it (this both resolves the
# multiplier, and marks the named expression as
# having been used)
other = other.compile_repn('', None, self.named_exprs)
nl, nl_args, self.named_exprs = other
self.nonlinear.append((nl, nl_args))
return
if other.named_exprs is not None:
if self.named_exprs is None:
self.named_exprs = set(other.named_exprs)
else:
self.named_exprs.update(other.named_exprs)
if other.mult != 1:
mult = other.mult
self.const += mult * other.const
if other.linear:
linear = self.linear
for v, c in other.linear.items():
if v in linear:
linear[v] += c * mult
else:
linear[v] = c * mult
if other.nonlinear:
if other.nonlinear.__class__ is list:
other.compile_nonlinear_fragment()
if mult == -1:
prefix = self.template.negation
else:
prefix = self.template.multiplier % mult
self.nonlinear.append(
(prefix + other.nonlinear[0], other.nonlinear[1])
)
else:
self.const += other.const
if other.linear:
linear = self.linear
for v, c in other.linear.items():
if v in linear:
linear[v] += c
else:
linear[v] = c
if other.nonlinear:
if other.nonlinear.__class__ is list:
self.nonlinear.extend(other.nonlinear)
else:
self.nonlinear.append(other.nonlinear)
elif _type is _CONSTANT:
self.const += other[1]
def to_expr(self, var_map):
if self.nl is not None or self.nonlinear is not None:
# TODO: support converting general nonlinear expressions
# back to Pyomo expressions. This will require an AMPL
# parser.
raise MouseTrap("Cannot convert nonlinear AMPLRepn to Pyomo Expression")
if self.linear:
# Explicitly generate the LinearExpression. At time of
# writing, this is about 40% faster than standard operator
# overloading for O(1000) element sums
ans = LinearExpression(
[coef * var_map[vid] for vid, coef in self.linear.items()]
)
ans += self.const
else:
ans = self.const
return ans * self.mult
[docs]
class DebugAMPLRepn(AMPLRepn):
"""An `AMPLRepn` that uses the "debug" (annotated) NL format
This is identical to the :py:class:`AMPLRepn` class, except it is
built using the `TextNLDebugTemplate` formatting template. This
format includes descriptions of the operators and variable /
expression names in the NL text.
"""
__slots__ = ()
template = TextNLDebugTemplate
[docs]
def handle_negation_node(visitor, node, arg1):
if arg1[0] is _MONOMIAL:
return (_MONOMIAL, arg1[1], -1 * arg1[2])
elif arg1[0] is _GENERAL:
arg1[1].mult *= -1
return arg1
elif arg1[0] is _CONSTANT:
return (_CONSTANT, -1 * arg1[1])
else:
raise RuntimeError("%s: %s" % (type(arg1[0]), arg1))
[docs]
def handle_product_node(visitor, node, arg1, arg2):
if arg2[0] is _CONSTANT:
arg2, arg1 = arg1, arg2
if arg1[0] is _CONSTANT:
mult = arg1[1]
if not mult:
# simplify multiplication by 0 (if arg2 is zero, the
# simplification happens when we evaluate the constant
# below). Note that this is not IEEE-754 compliant, and
# will map 0*inf and 0*nan to 0 (and not to nan). We are
# including this for backwards compatibility with the NLv1
# writer, but arguably we should deprecate/remove this
# "feature" in the future.
if arg2[0] is _CONSTANT:
_prod = mult * arg2[1]
if _prod:
deprecation_warning(
f"Encountered {mult}*{_inv2str(arg2[1])} in expression tree. "
"Mapping the NaN result to 0 for compatibility "
"with the nl_v1 writer. In the future, this NaN "
"will be preserved/emitted to comply with IEEE-754.",
version='6.4.3',
)
_prod = 0
return (_CONSTANT, _prod)
return arg1
if mult == 1:
return arg2
elif arg2[0] is _MONOMIAL:
if mult != mult:
# This catches mult (i.e., arg1) == nan
return arg1
return (_MONOMIAL, arg2[1], mult * arg2[2])
elif arg2[0] is _GENERAL:
if mult != mult:
# This catches mult (i.e., arg1) == nan
return arg1
arg2[1].mult *= mult
return arg2
elif arg2[0] is _CONSTANT:
if not arg2[1]:
# Simplify multiplication by 0; see note above about
# IEEE-754 incompatibility.
_prod = mult * arg2[1]
if _prod:
deprecation_warning(
f"Encountered {_inv2str(mult)}*{arg2[1]} in expression tree. "
"Mapping the NaN result to 0 for compatibility "
"with the nl_v1 writer. In the future, this NaN "
"will be preserved/emitted to comply with IEEE-754.",
version='6.4.3',
)
_prod = 0
return (_CONSTANT, _prod)
return (_CONSTANT, mult * arg2[1])
nonlin = visitor.node_result_to_amplrepn(arg1).compile_repn(
visitor.template.product
)
nonlin = visitor.node_result_to_amplrepn(arg2).compile_repn(*nonlin)
return (_GENERAL, visitor.Result(0, None, nonlin))
[docs]
def handle_division_node(visitor, node, arg1, arg2):
if arg2[0] is _CONSTANT:
div = arg2[1]
if div == 1:
return arg1
if arg1[0] is _MONOMIAL:
tmp = apply_node_operation(node, (arg1[2], div))
if tmp != tmp:
# This catches if the coefficient division results in nan
return _CONSTANT, tmp
return (_MONOMIAL, arg1[1], tmp)
elif arg1[0] is _GENERAL:
tmp = apply_node_operation(node, (arg1[1].mult, div))
if tmp != tmp:
# This catches if the multiplier division results in nan
return _CONSTANT, tmp
arg1[1].mult = tmp
return arg1
elif arg1[0] is _CONSTANT:
return _CONSTANT, apply_node_operation(node, (arg1[1], div))
elif arg1[0] is _CONSTANT and not arg1[1]:
return _CONSTANT, 0
nonlin = visitor.node_result_to_amplrepn(arg1).compile_repn(
visitor.template.division
)
nonlin = visitor.node_result_to_amplrepn(arg2).compile_repn(*nonlin)
return (_GENERAL, visitor.Result(0, None, nonlin))
[docs]
def handle_pow_node(visitor, node, arg1, arg2):
if arg2[0] is _CONSTANT:
if arg1[0] is _CONSTANT:
ans = apply_node_operation(node, (arg1[1], arg2[1]))
if ans.__class__ in native_complex_types:
ans = complex_number_error(ans, visitor, node)
return _CONSTANT, ans
elif not arg2[1]:
return _CONSTANT, 1
elif arg2[1] == 1:
return arg1
nonlin = visitor.node_result_to_amplrepn(arg1).compile_repn(visitor.template.pow)
nonlin = visitor.node_result_to_amplrepn(arg2).compile_repn(*nonlin)
return (_GENERAL, visitor.Result(0, None, nonlin))
[docs]
def handle_abs_node(visitor, node, arg1):
if arg1[0] is _CONSTANT:
return (_CONSTANT, abs(arg1[1]))
nonlin = visitor.node_result_to_amplrepn(arg1).compile_repn(visitor.template.abs)
return (_GENERAL, visitor.Result(0, None, nonlin))
[docs]
def handle_unary_node(visitor, node, arg1):
if arg1[0] is _CONSTANT:
return _CONSTANT, apply_node_operation(node, (arg1[1],))
nonlin = visitor.node_result_to_amplrepn(arg1).compile_repn(
visitor.template.unary[node.name]
)
return (_GENERAL, visitor.Result(0, None, nonlin))
[docs]
def handle_exprif_node(visitor, node, arg1, arg2, arg3):
if arg1[0] is _CONSTANT:
if arg1[1]:
return arg2
else:
return arg3
nonlin = visitor.node_result_to_amplrepn(arg1).compile_repn(visitor.template.exprif)
nonlin = visitor.node_result_to_amplrepn(arg2).compile_repn(*nonlin)
nonlin = visitor.node_result_to_amplrepn(arg3).compile_repn(*nonlin)
return (_GENERAL, visitor.Result(0, None, nonlin))
[docs]
def handle_equality_node(visitor, node, arg1, arg2):
if arg1[0] is _CONSTANT and arg2[0] is _CONSTANT:
return (_CONSTANT, arg1[1] == arg2[1])
nonlin = visitor.node_result_to_amplrepn(arg1).compile_repn(
visitor.template.equality
)
nonlin = visitor.node_result_to_amplrepn(arg2).compile_repn(*nonlin)
return (_GENERAL, visitor.Result(0, None, nonlin))
[docs]
def handle_inequality_node(visitor, node, arg1, arg2):
if arg1[0] is _CONSTANT and arg2[0] is _CONSTANT:
return (_CONSTANT, node._apply_operation((arg1[1], arg2[1])))
nonlin = visitor.node_result_to_amplrepn(arg1).compile_repn(
visitor.template.strict_inequality_map[node.strict]
)
nonlin = visitor.node_result_to_amplrepn(arg2).compile_repn(*nonlin)
return (_GENERAL, visitor.Result(0, None, nonlin))
[docs]
def handle_ranged_inequality_node(visitor, node, arg1, arg2, arg3):
if arg1[0] is _CONSTANT and arg2[0] is _CONSTANT and arg3[0] is _CONSTANT:
return (_CONSTANT, node._apply_operation((arg1[1], arg2[1], arg3[1])))
op = visitor.template.strict_inequality_map[node.strict]
nl, args, named = visitor.node_result_to_amplrepn(arg1).compile_repn(
visitor.template.and_expr + op[0]
)
nl2, args2, named = visitor.node_result_to_amplrepn(arg2).compile_repn(
'', None, named
)
nl += nl2 + op[1] + nl2
args.extend(args2)
args.extend(args2)
nonlin = visitor.node_result_to_amplrepn(arg3).compile_repn(nl, args, named)
return (_GENERAL, visitor.Result(0, None, nonlin))
[docs]
def handle_named_expression_node(visitor, node, arg1):
_id = id(node)
# Note that while named subexpressions ('defined variables' in the
# ASL NL file vernacular) look like variables, they are not allowed
# to appear in the 'linear' portion of a constraint / objective
# definition. We will return this as a "var" template, but
# wrapped in the nonlinear portion of the expression tree.
repn = visitor.node_result_to_amplrepn(arg1)
# A local copy of the expression source list. This will be updated
# later if the same Expression node is encountered in another
# expression tree.
#
# This is a 3-tuple [con_id, obj_id, substitute_expression]. If the
# expression is used by more than 1 constraint / objective, then the
# id is set to 0. If it is not used by any, then it is None.
# substitute_expression is a bool indicating if this named
# subexpression tree should be directly substituted into any
# expression tree that references this node (i.e., do NOT emit the V
# line).
expression_source = [None, None, False]
# Record this common expression
visitor.subexpression_cache[_id] = (
# 0: the "component" that generated this expression ID
node,
# 1: the common subexpression (to be written out)
repn,
# 2: the source usage information for this subexpression:
# [(con_id, obj_id, substitute); see above]
expression_source,
)
# As we will eventually need the compiled form of any nonlinear
# expression, we will go ahead and compile it here. We do not
# do the same for the linear component as we will only need the
# linear component compiled to a dict if we are emitting the
# original (linear + nonlinear) V line (which will not happen if
# the V line is part of a larger linear operator).
if repn.nonlinear.__class__ is list:
repn.compile_nonlinear_fragment()
if not visitor.use_named_exprs:
return _GENERAL, repn.duplicate()
mult, repn.mult = repn.mult, 1
if repn.named_exprs is None:
repn.named_exprs = set()
# When converting this shared subexpression to a (nonlinear)
# node, we want to just reference this subexpression:
repn.nl = (visitor.template.var, (_id,))
if repn.nonlinear:
if repn.linear:
# If this expression has both linear and nonlinear
# components, we will follow the ASL convention and break
# the named subexpression into two named subexpressions: one
# that is only the nonlinear component and one that has the
# const/linear component (and references the first). This
# will allow us to propagate linear coefficients up from
# named subexpressions when appropriate.
sub_node = NLFragment(repn, node)
sub_id = id(sub_node)
sub_repn = visitor.Result(0, None, None)
sub_repn.nonlinear = repn.nonlinear
sub_repn.nl = (visitor.template.var, (sub_id,))
sub_repn.named_exprs = set(repn.named_exprs)
repn.named_exprs.add(sub_id)
repn.nonlinear = sub_repn.nl
# See above for the meaning of this source information
nl_info = list(expression_source)
visitor.subexpression_cache[sub_id] = (sub_node, sub_repn, nl_info)
# It is important that the NL subexpression comes before the
# main named expression: re-insert the original named
# expression (so that the nonlinear sub_node comes first
# when iterating over subexpression_cache)
visitor.subexpression_cache[_id] = visitor.subexpression_cache.pop(_id)
else:
nl_info = expression_source
else:
repn.nonlinear = None
if repn.linear:
if (
not repn.const
and len(repn.linear) == 1
and next(iter(repn.linear.values())) == 1
):
# This Expression holds only a variable (multiplied by
# 1). Do not emit this as a named variable and instead
# just inject the variable where this expression is
# used.
repn.nl = None
expression_source[2] = True
else:
# This Expression holds only a constant. Do not emit this
# as a named variable and instead just inject the constant
# where this expression is used.
repn.nl = None
expression_source[2] = True
if mult != 1:
repn.const *= mult
if repn.linear:
_lin = repn.linear
for v in repn.linear:
_lin[v] *= mult
if repn.nonlinear:
if mult == -1:
prefix = visitor.template.negation
else:
prefix = visitor.template.multiplier % mult
repn.nonlinear = prefix + repn.nonlinear[0], repn.nonlinear[1]
if expression_source[2]:
if repn.linear:
assert len(repn.linear) == 1 and not repn.const
return (_MONOMIAL,) + next(iter(repn.linear.items()))
else:
return (_CONSTANT, repn.const)
return (_GENERAL, repn.duplicate())
[docs]
def handle_external_function_node(visitor, node, *args):
func = node._fcn._function
# There is a special case for external functions: these are the only
# expressions that can accept string arguments. As we currently pass
# these as 'precompiled' GENERAL AMPLRepns, the normal trap for
# constant subexpressions will miss string arguments. We will catch
# that case here by looking for NL fragments with no variable
# references. Note that the NL fragment is NOT the raw string
# argument that we want to evaluate: the raw string is in the
# `const` field.
if all(
arg[0] is _CONSTANT or (arg[0] is _GENERAL and arg[1].nl and not arg[1].nl[1])
for arg in args
):
arg_list = [arg[1] if arg[0] is _CONSTANT else arg[1].const for arg in args]
return _CONSTANT, apply_node_operation(node, arg_list)
if func in visitor.external_functions:
if node._fcn._library != visitor.external_functions[func][1]._library:
raise RuntimeError(
"The same external function name (%s) is associated "
"with two different libraries (%s through %s, and %s "
"through %s). The ASL solver will fail to link "
"correctly."
% (
func,
visitor.external_functions[func]._library,
visitor.external_functions[func]._library.name,
node._fcn._library,
node._fcn.name,
)
)
else:
visitor.external_functions[func] = (len(visitor.external_functions), node._fcn)
comment = f'\t#{node.local_name}' if visitor.symbolic_solver_labels else ''
nl = visitor.template.external_fcn % (
visitor.external_functions[func][0],
len(args),
comment,
)
arg_ids = []
named_exprs = set()
for arg in args:
_id = id(arg)
arg_ids.append(_id)
visitor.subexpression_cache[_id] = (
arg,
visitor.Result(
0,
None,
visitor.node_result_to_amplrepn(arg).compile_repn(
named_exprs=named_exprs
),
),
(None, None, True),
)
if not named_exprs:
named_exprs = None
return (
_GENERAL,
visitor.Result(0, None, (nl + '%s' * len(arg_ids), arg_ids, named_exprs)),
)
_operator_handles = ExitNodeDispatcher(
{
NegationExpression: handle_negation_node,
ProductExpression: handle_product_node,
DivisionExpression: handle_division_node,
PowExpression: handle_pow_node,
AbsExpression: handle_abs_node,
UnaryFunctionExpression: handle_unary_node,
Expr_ifExpression: handle_exprif_node,
EqualityExpression: handle_equality_node,
InequalityExpression: handle_inequality_node,
RangedExpression: handle_ranged_inequality_node,
Expression: handle_named_expression_node,
ExternalFunctionExpression: handle_external_function_node,
# These are handled explicitly in beforeChild():
# LinearExpression: handle_linear_expression,
# SumExpression: handle_sum_expression,
#
# Note: MonomialTermExpression is only hit when processing NPV
# subexpressions that raise errors (e.g., log(0) * m.x), so no
# special processing is needed [it is just a product expression]
MonomialTermExpression: handle_product_node,
}
)
[docs]
class AMPLBeforeChildDispatcher(BeforeChildDispatcher):
__slots__ = ()
[docs]
def __init__(self):
# Special linear / summation expressions
self[MonomialTermExpression] = self._before_monomial
self[LinearExpression] = self._before_linear
self[SumExpression] = self._before_general_expression
@staticmethod
def _record_var(visitor, var):
# We always add all indices to the var_map at once so that
# we can honor deterministic ordering of unordered sets
# (because the user could have iterated over an unordered
# set when constructing an expression, thereby altering the
# order in which we would see the variables)
vm = visitor.var_map
try:
_iter = var.parent_component().values(visitor.sorter)
except AttributeError:
# Note that this only works for the AML, as kernel does not
# provide a parent_component()
_iter = (var,)
for v in _iter:
if v.fixed:
continue
vm[id(v)] = v
@staticmethod
def _before_string(visitor, child):
visitor.encountered_string_arguments = True
ans = visitor.Result(child, None, None)
ans.nl = (visitor.template.string % (len(child), child), ())
return False, (_GENERAL, ans)
@staticmethod
def _before_var(visitor, child):
_id = id(child)
if _id not in visitor.var_map:
if child.fixed:
if _id not in visitor.fixed_vars:
visitor.cache_fixed_var(_id, child)
return False, (_CONSTANT, visitor.fixed_vars[_id])
_before_child_handlers._record_var(visitor, child)
return False, (_MONOMIAL, _id, 1)
@staticmethod
def _before_monomial(visitor, child):
#
# The following are performance optimizations for common
# situations (Monomial terms and Linear expressions)
#
arg1, arg2 = child._args_
if arg1.__class__ not in native_types:
try:
arg1 = visitor.check_constant(visitor.evaluate(arg1), arg1)
except (ValueError, ArithmeticError):
return True, None
# Trap multiplication by 0 and nan.
if not arg1:
if arg2.fixed:
_id = id(arg2)
if _id not in visitor.fixed_vars:
visitor.cache_fixed_var(id(arg2), arg2)
arg2 = visitor.fixed_vars[_id]
if arg2 != arg2:
deprecation_warning(
f"Encountered {arg1}*{_inv2str(arg2)} in expression tree. "
"Mapping the NaN result to 0 for compatibility "
"with the nl_v1 writer. In the future, this NaN "
"will be preserved/emitted to comply with IEEE-754.",
version='6.4.3',
)
return False, (_CONSTANT, arg1)
_id = id(arg2)
if _id not in visitor.var_map:
if arg2.fixed:
if _id not in visitor.fixed_vars:
visitor.cache_fixed_var(_id, arg2)
return False, (_CONSTANT, arg1 * visitor.fixed_vars[_id])
_before_child_handlers._record_var(visitor, arg2)
return False, (_MONOMIAL, _id, arg1)
@staticmethod
def _before_linear(visitor, child):
# Because we are going to modify the LinearExpression in this
# walker, we need to make a copy of the arg list from the original
# expression tree.
var_map = visitor.var_map
const = 0
linear = {}
for arg in child.args:
if arg.__class__ is MonomialTermExpression:
arg1, arg2 = arg._args_
if arg1.__class__ not in native_types:
try:
arg1 = visitor.check_constant(visitor.evaluate(arg1), arg1)
except (ValueError, ArithmeticError):
return True, None
# Trap multiplication by 0 and nan.
if not arg1:
if arg2.fixed:
arg2 = visitor.check_constant(arg2.value, arg2)
if arg2 != arg2:
deprecation_warning(
f"Encountered {arg1}*{_inv2str(arg2)} in expression "
"tree. Mapping the NaN result to 0 for compatibility "
"with the nl_v1 writer. In the future, this NaN "
"will be preserved/emitted to comply with IEEE-754.",
version='6.4.3',
)
continue
_id = id(arg2)
if _id not in var_map:
if arg2.fixed:
if _id not in visitor.fixed_vars:
visitor.cache_fixed_var(_id, arg2)
const += arg1 * visitor.fixed_vars[_id]
continue
_before_child_handlers._record_var(visitor, arg2)
linear[_id] = arg1
elif _id in linear:
linear[_id] += arg1
else:
linear[_id] = arg1
elif arg.__class__ in native_types:
const += arg
elif arg.is_variable_type():
_id = id(arg)
if _id not in var_map:
if arg.fixed:
if _id not in visitor.fixed_vars:
visitor.cache_fixed_var(_id, arg)
const += visitor.fixed_vars[_id]
continue
_before_child_handlers._record_var(visitor, arg)
linear[_id] = 1
elif _id in linear:
linear[_id] += 1
else:
linear[_id] = 1
else:
try:
const += visitor.check_constant(visitor.evaluate(arg), arg)
except (ValueError, ArithmeticError):
return True, None
if linear:
return False, (_GENERAL, visitor.Result(const, linear, None))
else:
return False, (_CONSTANT, const)
@staticmethod
def _before_named_expression(visitor, child):
_id = id(child)
if _id in visitor.subexpression_cache:
obj, repn, info = visitor.subexpression_cache[_id]
if info[2]:
if repn.linear:
return False, (_MONOMIAL, next(iter(repn.linear)), 1)
else:
return False, (_CONSTANT, repn.const)
return False, (_GENERAL, repn.duplicate())
else:
return True, None
_before_child_handlers = AMPLBeforeChildDispatcher()
[docs]
class AMPLRepnVisitor(StreamBasedExpressionVisitor):
[docs]
def __init__(
self,
subexpression_cache,
external_functions,
var_map,
used_named_expressions,
symbolic_solver_labels,
use_named_exprs,
sorter,
):
super().__init__()
self.subexpression_cache = subexpression_cache
self.external_functions = external_functions
self.active_expression_source = None
self.var_map = var_map
self.used_named_expressions = used_named_expressions
self.symbolic_solver_labels = symbolic_solver_labels
self.use_named_exprs = use_named_exprs
self.encountered_string_arguments = False
self.fixed_vars = {}
self._eval_expr_visitor = _EvaluationVisitor(True)
self.evaluate = self._eval_expr_visitor.dfs_postorder_stack
self.sorter = sorter
if symbolic_solver_labels:
self.Result = DebugAMPLRepn
else:
self.Result = AMPLRepn
self.template = self.Result.template
def check_constant(self, ans, obj):
if ans.__class__ not in native_numeric_types:
# None can be returned from uninitialized Var/Param objects
if ans is None:
return InvalidNumber(
None, f"'{obj}' evaluated to a nonnumeric value '{ans}'"
)
if ans.__class__ is InvalidNumber:
return ans
elif ans.__class__ in native_complex_types:
return complex_number_error(ans, self, obj)
else:
# It is possible to get other non-numeric types. Most
# common are bool and 1-element numpy.array(). We will
# attempt to convert the value to a float before
# proceeding.
#
# TODO: we should check bool and warn/error (while bool is
# convertible to float in Python, they have very
# different semantic meanings in Pyomo).
try:
ans = float(ans)
except:
return InvalidNumber(
ans, f"'{obj}' evaluated to a nonnumeric value '{ans}'"
)
if ans != ans:
return InvalidNumber(
nan, f"'{obj}' evaluated to a nonnumeric value '{ans}'"
)
return ans
def cache_fixed_var(self, _id, child):
val = self.check_constant(child.value, child)
lb, ub = child.bounds
if (lb is not None and lb - val > TOL) or (ub is not None and ub - val < -TOL):
raise InfeasibleConstraintException(
"model contains a trivially infeasible "
f"variable '{child.name}' (fixed value "
f"{val} outside bounds [{lb}, {ub}])."
)
self.fixed_vars[_id] = self.check_constant(child.value, child)
def node_result_to_amplrepn(self, data):
if data[0] is _GENERAL:
return data[1]
elif data[0] is _MONOMIAL:
_, v, c = data
if c:
return self.Result(0, {v: c}, None)
else:
return self.Result(0, None, None)
elif data[0] is _CONSTANT:
return self.Result(data[1], None, None)
else:
raise DeveloperError("unknown result type")
def initializeWalker(self, expr):
expr, src, src_idx, self.expression_scaling_factor = expr
self.active_expression_source = (src_idx, id(src))
walk, result = self.beforeChild(None, expr, 0)
if not walk:
return False, self.finalizeResult(result)
return True, expr
def beforeChild(self, node, child, child_idx):
return _before_child_handlers[child.__class__](self, child)
def enterNode(self, node):
# SumExpression are potentially large nary operators. Directly
# populate the result
if node.__class__ in sum_like_expression_types:
data = self.Result(0, {}, None)
data.nonlinear = []
return node.args, data
else:
return node.args, []
def exitNode(self, node, data):
if data.__class__ is self.Result:
# If the summation resulted in a constant, return the constant
if data.linear or data.nonlinear or data.nl:
return (_GENERAL, data)
else:
return (_CONSTANT, data.const)
#
# General expressions...
#
return _operator_handles[node.__class__](self, node, *data)
def finalizeResult(self, result):
ans = self.node_result_to_amplrepn(result)
# Multiply the expression by the scaling factor provided by the caller
ans.mult *= self.expression_scaling_factor
# If this was a nonlinear named expression, and that expression
# has no linear portion, then we will directly use this as a
# named expression. We need to mark that the expression was
# used and return it as a simple nonlinear expression pointing
# to this named expression. In all other cases, we will return
# the processed representation (which will reference the
# nonlinear-only named subexpression - if it exists - but not
# this outer named expression). This prevents accidentally
# recharacterizing variables that only appear linearly as
# nonlinear variables.
if ans.nl is not None:
if not ans.nl[1]:
raise ValueError("Numeric expression resolved to a string constant")
# This *is* a named subexpression. If there is no linear
# component, then replace this expression with the named
# expression. The mult will be handled later. We know that
# the const is built into the nonlinear expression, because
# it cannot be changed "in place" (only through addition,
# which would have "cleared" the nl attribute)
if not ans.linear:
ans.named_exprs.update(ans.nl[1])
ans.nonlinear = ans.nl
ans.const = 0
else:
# This named expression has both a linear and a
# nonlinear component, and possibly a multiplier and
# constant. We will not include this named expression
# and instead will expose the components so that linear
# variables are not accidentally re-characterized as
# nonlinear.
pass
ans.nl = None
if ans.nonlinear.__class__ is list:
ans.compile_nonlinear_fragment()
if not ans.linear:
ans.linear = {}
if ans.mult != 1:
linear = ans.linear
mult, ans.mult = ans.mult, 1
ans.const *= mult
if linear:
for k in linear:
linear[k] *= mult
if ans.nonlinear:
if mult == -1:
prefix = self.template.negation
else:
prefix = self.template.multiplier % mult
ans.nonlinear = prefix + ans.nonlinear[0], ans.nonlinear[1]
#
self.active_expression_source = None
return ans
[docs]
def evaluate_ampl_nl_expression(nl, external_functions):
expr = nl.splitlines()
stack = []
while expr:
line = expr.pop()
tokens = line.split()
# remove tokens after the first comment
for i, t in enumerate(tokens):
if t.startswith('#'):
tokens = tokens[:i]
break
if len(tokens) != 1:
# skip blank lines
if not tokens:
continue
if tokens[0][0] == 'f':
# external function
fid, nargs = tokens
fid = int(fid[1:])
nargs = int(nargs)
fcn_id, ef = external_functions[fid]
assert fid == fcn_id
stack.append(ef.evaluate(tuple(stack.pop() for i in range(nargs))))
continue
raise DeveloperError(
f"Unsupported line format _evaluate_constant_nl() "
f"(we expect each line to contain a single token): '{line}'"
)
term = tokens[0]
# the "command" can be determined by the first character on the line
cmd = term[0]
# Note that we will unpack the line into the expected number of
# explicit arguments as a form of error checking
if cmd == 'n':
# numeric constant
stack.append(float(term[1:]))
elif cmd == 'o':
# operator
nargs, fcn = nl_operators[int(term[1:])]
if nargs is None:
nargs = int(stack.pop())
stack.append(fcn(*(stack.pop() for i in range(nargs))))
elif cmd in '1234567890':
# this is either a single int (e.g., the nargs in a nary
# sum) or a string argument. Preserve it as-is until later
# when we know which we are expecting.
stack.append(term)
elif cmd == 'h':
stack.append(term.split(':', 1)[1])
else:
raise DeveloperError(
f"Unsupported NL operator in _evaluate_constant_nl(): '{line}'"
)
assert len(stack) == 1
return stack[0]