# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
import collections
import functools
import inspect
from collections.abc import Sequence
from collections.abc import Mapping
from pyomo.common.autoslots import AutoSlots
from pyomo.common.dependencies import numpy, numpy_available, pandas, pandas_available
from pyomo.common.modeling import NOTSET
from pyomo.core.pyomoobject import PyomoObject
initializer_map = {}
sequence_types = set()
# initialize with function, method, and method-wrapper types.
function_types = set(
[
type(PyomoObject.is_expression_type),
type(PyomoObject().is_expression_type),
type(PyomoObject.is_expression_type.__call__),
]
)
[docs]
def Initializer(
arg,
allow_generators=False,
treat_sequences_as_mappings=True,
arg_not_specified=None,
additional_args=0,
):
"""Standardized processing of Component keyword arguments
Component keyword arguments accept a number of possible inputs, from
scalars to dictionaries, to functions (rules) and generators. This
function standardizes the processing of keyword arguments and
returns "initializer classes" that are specialized to the specific
data type provided.
Parameters
----------
arg:
The argument passed to the component constructor. This could
be almost any type, including a scalar, dict, list, function,
generator, or None.
allow_generators: bool
If False, then we will raise an exception if ``arg`` is a generator
treat_sequences_as_mappings: bool
If True, then if ``arg`` is a sequence, we will treat it as if
it were a mapping (i.e., ``dict(enumerate(arg))``). Otherwise
sequences will be returned back as the value of the initializer.
arg_not_specified:
If ``arg`` is ``arg_not_specified``, then the function will
return None (and not an InitializerBase object).
additional_args: int
The number of additional arguments that will be passed to any
function calls (provided *before* the index value).
"""
if arg is arg_not_specified:
return None
if additional_args:
if arg.__class__ in function_types:
if allow_generators or inspect.isgeneratorfunction(arg):
raise ValueError(
"Generator functions are not allowed when passing additional args"
)
_args = inspect.getfullargspec(arg)
_nargs = len(_args.args)
if inspect.ismethod(arg) and arg.__self__ is not None:
# Ignore 'self' for bound instance methods and 'cls' for
# @classmethods
_nargs -= 1
if _nargs == 1 + additional_args and _args.varargs is None:
return ParameterizedScalarCallInitializer(arg, constant=True)
else:
return ParameterizedIndexedCallInitializer(arg)
else:
base_initializer = Initializer(
arg=arg,
allow_generators=allow_generators,
treat_sequences_as_mappings=treat_sequences_as_mappings,
arg_not_specified=arg_not_specified,
)
if type(base_initializer) in (
ScalarCallInitializer,
IndexedCallInitializer,
):
# This is an edge case: if we are providing additional
# args, but this is the first time we are seeing a
# callable type, we will (potentially) incorrectly
# categorize this as an IndexedCallInitializer. Re-try
# now that we know this is a function_type.
return Initializer(
arg=base_initializer._fcn,
allow_generators=allow_generators,
treat_sequences_as_mappings=treat_sequences_as_mappings,
arg_not_specified=arg_not_specified,
additional_args=additional_args,
)
return ParameterizedInitializer(base_initializer)
if arg.__class__ in initializer_map:
return initializer_map[arg.__class__](arg)
if arg.__class__ in sequence_types:
if treat_sequences_as_mappings:
return ItemInitializer(arg)
else:
return ConstantInitializer(arg)
if arg.__class__ in function_types:
# Note: we do not use "inspect.isfunction or inspect.ismethod"
# because some function-like things (notably cythonized
# functions) return False
if not allow_generators and inspect.isgeneratorfunction(arg):
raise ValueError("Generator functions are not allowed")
# Historically pyomo.core.base.misc.apply_indexed_rule
# accepted rules that took only the parent block (even for
# indexed components). We will preserve that functionality
# here.
#
# I was concerned that some builtins aren't compatible with
# getfullargspec (and would need the same try-except logic as in
# the partial handling), but I have been unable to come up with
# an example. The closest was getattr(), but that falls back on
# getattr.__call__, which does support getfullargspec.
_args = inspect.getfullargspec(arg)
_nargs = len(_args.args)
if inspect.ismethod(arg) and arg.__self__ is not None:
# Ignore 'self' for bound instance methods and 'cls' for
# @classmethods
_nargs -= 1
if _nargs == 1 and _args.varargs is None:
return ScalarCallInitializer(
arg, constant=not inspect.isgeneratorfunction(arg)
)
else:
return IndexedCallInitializer(arg)
if hasattr(arg, '__len__'):
if isinstance(arg, Mapping):
initializer_map[arg.__class__] = ItemInitializer
elif isinstance(arg, Sequence) and not isinstance(arg, str):
sequence_types.add(arg.__class__)
elif isinstance(arg, PyomoObject):
# TODO: Should IndexedComponent inherit from
# collections.abc.Mapping?
if arg.is_component_type() and arg.is_indexed():
initializer_map[arg.__class__] = ItemInitializer
else:
initializer_map[arg.__class__] = ConstantInitializer
elif any(c.__name__ == 'ndarray' for c in arg.__class__.__mro__):
if numpy_available and isinstance(arg, numpy.ndarray):
sequence_types.add(arg.__class__)
elif any(c.__name__ == 'Series' for c in arg.__class__.__mro__):
if pandas_available and isinstance(arg, pandas.Series):
sequence_types.add(arg.__class__)
elif any(c.__name__ == 'DataFrame' for c in arg.__class__.__mro__):
if pandas_available and isinstance(arg, pandas.DataFrame):
initializer_map[arg.__class__] = DataFrameInitializer
else:
# Note: this picks up (among other things) all string instances
initializer_map[arg.__class__] = ConstantInitializer
# recursively call Initializer to pick up the new registration
return Initializer(
arg,
allow_generators=allow_generators,
treat_sequences_as_mappings=treat_sequences_as_mappings,
arg_not_specified=arg_not_specified,
)
if inspect.isgenerator(arg) or hasattr(arg, 'next') or hasattr(arg, '__next__'):
# This catches generators and iterators (like enumerate()), but
# skips "reusable" iterators like range() as well as Pyomo
# (finite) Set objects [they were both caught by the
# "hasattr('__len__')" above]
if not allow_generators:
raise ValueError("Generators are not allowed")
# Deepcopying generators is problematic (e.g., it generates a
# segfault in pypy3 7.3.0). We will immediately expand the
# generator into a tuple and then store it as a constant.
return ConstantInitializer(tuple(arg))
if type(arg) is functools.partial:
try:
_args = inspect.getfullargspec(arg.func)
except:
# Inspect doesn't work for some built-in callables (notably
# 'int'). We will just have to assume this is a "normal"
# IndexedCallInitializer
return IndexedCallInitializer(arg)
_positional_args = set(_args.args)
for key in arg.keywords:
_positional_args.discard(key)
if len(_positional_args) - len(arg.args) == 1 and _args.varargs is None:
return ScalarCallInitializer(arg)
else:
return IndexedCallInitializer(arg)
if isinstance(arg, InitializerBase):
return arg
if isinstance(arg, PyomoObject):
# We re-check for PyomoObject here, as that picks up / caches
# non-components like component data objects and expressions
initializer_map[arg.__class__] = ConstantInitializer
return ConstantInitializer(arg)
if callable(arg) and not isinstance(arg, type):
# We assume any callable thing could be a functor; but, we must
# filter out types, as we use types as special identifiers that
# should not be called (e.g., UnknownSetDimen)
if inspect.isfunction(arg) or inspect.ismethod(arg):
# Add this to the set of known function types and try again
function_types.add(type(arg))
else:
# Try again, but use the __call__ method (for supporting
# things like functors and cythonized functions). __call__
# is almost certainly going to be a method-wrapper
arg = arg.__call__
return Initializer(
arg,
allow_generators=allow_generators,
treat_sequences_as_mappings=treat_sequences_as_mappings,
arg_not_specified=arg_not_specified,
)
initializer_map[arg.__class__] = ConstantInitializer
return ConstantInitializer(arg)
[docs]
class InitializerBase(AutoSlots.Mixin, object):
"""Base class for all Initializer objects"""
__slots__ = ()
verified = False
[docs]
def constant(self):
"""Return True if this initializer is constant across all indices"""
return False
[docs]
def contains_indices(self):
"""Return True if this initializer contains embedded indices"""
return False
[docs]
def indices(self):
"""Return a generator over the embedded indices
This will raise a RuntimeError if this initializer does not
contain embedded indices
"""
raise RuntimeError(
"Initializer %s does not contain embedded indices" % (type(self).__name__,)
)
[docs]
class ConstantInitializer(InitializerBase):
"""Initializer for constant values"""
__slots__ = ('val', 'verified')
[docs]
def __init__(self, val):
self.val = val
self.verified = False
def __call__(self, parent, idx):
return self.val
[docs]
def constant(self):
return True
[docs]
class ItemInitializer(InitializerBase):
"""Initializer for dict-like values supporting __getitem__()"""
__slots__ = ('_dict',)
[docs]
def __init__(self, _dict):
self._dict = _dict
def __call__(self, parent, idx):
return self._dict[idx]
[docs]
def contains_indices(self):
return True
[docs]
def indices(self):
try:
return self._dict.keys()
except AttributeError:
return range(len(self._dict))
[docs]
class DataFrameInitializer(InitializerBase):
"""Initializer for pandas DataFrame values"""
__slots__ = ('_df', '_column')
[docs]
def __init__(self, dataframe, column=None):
self._df = dataframe
if column is not None:
self._column = column
elif len(dataframe.columns) == 1:
self._column = dataframe.columns[0]
else:
raise ValueError(
"Cannot construct DataFrameInitializer for DataFrame with "
"multiple columns without also specifying the data column"
)
def __call__(self, parent, idx):
return self._df.at[idx, self._column]
[docs]
def contains_indices(self):
return True
[docs]
def indices(self):
return self._df.index
[docs]
class IndexedCallInitializer(InitializerBase):
"""Initializer for functions and callable objects"""
__slots__ = ('_fcn',)
[docs]
def __init__(self, _fcn):
self._fcn = _fcn
def __call__(self, parent, idx):
# Note: this is called by a component using data from a Set (so
# any tuple-like type should have already been checked and
# converted to a tuple; or flattening is turned off and it is
# the user's responsibility to sort things out.
if idx.__class__ is tuple:
return self._fcn(parent, *idx)
else:
return self._fcn(parent, idx)
[docs]
class ParameterizedIndexedCallInitializer(IndexedCallInitializer):
"""IndexedCallInitializer that accepts additional arguments"""
__slots__ = ()
def __call__(self, parent, idx, *args):
if idx.__class__ is tuple:
return self._fcn(parent, *args, *idx)
else:
return self._fcn(parent, *args, idx)
[docs]
class CountedCallGenerator(object):
"""Generator implementing the "counted call" initialization scheme
This generator implements the older "counted call" scheme, where the
first argument past the parent block is a monotonically-increasing
integer beginning at `start_at`.
"""
[docs]
def __init__(self, ctype, fcn, scalar, parent, idx, start_at):
# Note: this is called by a component using data from a Set (so
# any tuple-like type should have already been checked and
# converted to a tuple; or flattening is turned off and it is
# the user's responsibility to sort things out.
self._count = start_at - 1
if scalar:
self._fcn = lambda c: self._filter(ctype, fcn(parent, c))
elif idx.__class__ is tuple:
self._fcn = lambda c: self._filter(ctype, fcn(parent, c, *idx))
else:
self._fcn = lambda c: self._filter(ctype, fcn(parent, c, idx))
def __iter__(self):
return self
def __next__(self):
self._count += 1
return self._fcn(self._count)
next = __next__
@staticmethod
def _filter(ctype, x):
if x is None:
raise ValueError(
"""Counted %s rule returned None instead of %s.End.
Counted %s rules of the form fcn(model, count, *idx) will be called
repeatedly with an increasing count parameter until the rule returns
%s.End. None is not a valid return value in this case due to the
likelihood that an error in the rule can incorrectly return None."""
% ((ctype.__name__,) * 4)
)
return x
[docs]
class CountedCallInitializer(InitializerBase):
"""Initializer for functions implementing the "counted call" API."""
# Pyomo has a historical feature for some rules, where the number of
# times[*1] the rule was called could be passed as an additional
# argument between the block and the index. This was primarily
# supported by Set and ConstraintList. There were many issues with
# the syntax, including inconsistent support for jagged (dimen=None)
# indexing sets, inconsistent support for *args rules, and a likely
# infinite loop if the rule returned Constraint.Skip.
#
# As a slight departure from previous implementations, we will ONLY
# allow the counted rule syntax when the rule does NOT use *args
#
# [*1] The extra argument was one-based, and was only incremented
# when a valid value was returned by the rule and added to the
# _data. This was fragile, as returning something like
# {Component}.Skip could result in an infinite loop. This
# implementation deviates from that behavior and increments the
# counter every time the rule is called.
#
# [JDS 6/2019] We will support a slightly restricted but more
# consistent form of the original implementation for backwards
# compatibility, but I believe that we should deprecate this syntax
# entirely.
__slots__ = ('_fcn', '_is_counted_rule', '_scalar', '_ctype', '_start')
[docs]
def __init__(self, obj, _indexed_init, starting_index=1):
self._fcn = _indexed_init._fcn
self._is_counted_rule = None
self._scalar = not obj.is_indexed()
self._ctype = obj.ctype
self._start = starting_index
if self._scalar:
self._is_counted_rule = True
def __call__(self, parent, idx):
# Note: this is called by a component using data from a Set (so
# any tuple-like type should have already been checked and
# converted to a tuple; or flattening is turned off and it is
# the user's responsibility to sort things out.
if self._is_counted_rule == False:
if idx.__class__ is tuple:
return self._fcn(parent, *idx)
else:
return self._fcn(parent, idx)
if self._is_counted_rule == True:
return CountedCallGenerator(
self._ctype, self._fcn, self._scalar, parent, idx, self._start
)
# Note that this code will only be called once, and only if
# the object is not a scalar.
_args = inspect.getfullargspec(self._fcn)
_nargs = len(_args.args)
if inspect.ismethod(self._fcn) and self._fcn.__self__ is not None:
_nargs -= 1
_len = len(idx) if idx.__class__ is tuple else 1
if _len + 2 == _nargs:
self._is_counted_rule = True
else:
self._is_counted_rule = False
return self.__call__(parent, idx)
[docs]
class ScalarCallInitializer(InitializerBase):
"""Initializer for functions taking only the parent block argument."""
__slots__ = ('_fcn', '_constant')
[docs]
def __init__(self, _fcn, constant=True):
self._fcn = _fcn
self._constant = constant
def __call__(self, parent, idx):
return self._fcn(parent)
[docs]
def constant(self):
"""Return True if this initializer is constant across all indices"""
return self._constant
[docs]
class ParameterizedScalarCallInitializer(ScalarCallInitializer):
"""ScalarCallInitializer that accepts additional arguments"""
__slots__ = ()
def __call__(self, parent, idx, *args):
return self._fcn(parent, *args)
[docs]
class DefaultInitializer(InitializerBase):
"""Initializer wrapper that maps exceptions to default values.
Parameters
----------
initializer: :py:class`InitializerBase`
the Initializer instance to wrap
default:
the value to return inlieu of the caught exception(s)
exceptions: Exception or tuple
the single Exception or tuple of Exceptions to catch and return
the default value.
"""
__slots__ = ('_initializer', '_default', '_exceptions')
[docs]
def __init__(self, initializer, default, exceptions):
self._initializer = initializer
self._default = default
self._exceptions = exceptions
def __call__(self, parent, index):
try:
return self._initializer(parent, index)
except self._exceptions:
return self._default
[docs]
def constant(self):
"""Return True if this initializer is constant across all indices"""
return self._initializer.constant()
[docs]
def contains_indices(self):
"""Return True if this initializer contains embedded indices"""
return self._initializer.contains_indices()
[docs]
def indices(self):
return self._initializer.indices()
[docs]
class ParameterizedInitializer(InitializerBase):
"""Base class for all Initializer objects"""
__slots__ = ('_base_initializer',)
[docs]
def __init__(self, base):
self._base_initializer = base
[docs]
def constant(self):
"""Return True if this initializer is constant across all indices"""
return self._base_initializer.constant()
[docs]
def contains_indices(self):
"""Return True if this initializer contains embedded indices"""
return self._base_initializer.contains_indices()
[docs]
def indices(self):
"""Return a generator over the embedded indices
This will raise a RuntimeError if this initializer does not
contain embedded indices
"""
return self._base_initializer.indices()
def __call__(self, parent, idx, *args):
return self._base_initializer(parent, idx)(parent, *args)
_bound_sequence_types = collections.defaultdict(None.__class__)
[docs]
class BoundInitializer(InitializerBase):
"""Initializer wrapper for processing bounds (mapping scalars to 2-tuples)
Note that this class is meant to mimic the behavior of
:py:func:`Initializer` and will return ``None`` if the initializer
that it is wrapping is ``None``.
Parameters
----------
arg:
As with :py:func:`Initializer`, this is the raw argument passed
to the component constructor.
obj: :py:class:`Component`
The component that "owns" the initializer. This initializer
will treat sequences as mappings only if the owning component is
indexed and the sequence passed to the initializer is not of
length 2
"""
__slots__ = ('_initializer',)
def __new__(cls, arg=None, obj=NOTSET):
# The Initializer() function returns None if the initializer is
# None. We will mock that behavior by commandeering __new__()
if arg is None and obj is not NOTSET:
return None
else:
return super().__new__(cls)
[docs]
def __init__(self, arg, obj=NOTSET):
if obj is NOTSET or obj.is_indexed():
treat_sequences_as_mappings = not (
isinstance(arg, Sequence)
and len(arg) == 2
and not isinstance(arg[0], Sequence)
)
else:
treat_sequences_as_mappings = False
self._initializer = Initializer(
arg, treat_sequences_as_mappings=treat_sequences_as_mappings
)
def __call__(self, parent, index):
val = self._initializer(parent, index)
if _bound_sequence_types[val.__class__]:
return val
if _bound_sequence_types[val.__class__] is None:
_bound_sequence_types[val.__class__] = isinstance(
val, Sequence
) and not isinstance(val, str)
if _bound_sequence_types[val.__class__]:
return val
return (val, val)
[docs]
def constant(self):
"""Return True if this initializer is constant across all indices"""
return self._initializer.constant()
[docs]
def contains_indices(self):
"""Return True if this initializer contains embedded indices"""
return self._initializer.contains_indices()
[docs]
def indices(self):
return self._initializer.indices()