# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
"""A module for "flattening" the components in a block-hierarchical model
with respect to common indexing sets
"""
from pyomo.core.base import Block, Reference
from pyomo.common.collections import ComponentSet, ComponentMap
from pyomo.core.base.block import SubclassOf
from pyomo.core.base.set import SetProduct
from pyomo.core.base.indexed_component import UnindexedComponent_set, normalize_index
from pyomo.core.base.component import ActiveComponent
from pyomo.core.base.indexed_component_slice import IndexedComponent_slice
from collections import OrderedDict
[docs]
def get_slice_for_set(s):
"""
Get the slice of the proper dimension for a set.
"""
if s.dimen != 0:
if not normalize_index.flatten:
return slice(None)
else:
if s.dimen is not None:
# We will arrive here and fail for sets of dimension
# UnknownSetDimen.
return (slice(None),) * s.dimen
else:
return (Ellipsis,)
else:
# Case for e.g. UnindexedComponent_set
# Should this be None or tuple()? RBP 202110
return None
class _NotAnIndex(object):
"""
`None` is a valid index, so we use a dummy class to
denote a slot that needs to get filled with indices
from our product.
"""
pass
def _fill_indices(filled_index, index):
"""
`filled_index` is a list with some entries `_NotAnIndex`.
We fill those entries with values from `index`, a tuple.
"""
j = 0
for i, val in enumerate(filled_index):
if val is _NotAnIndex:
filled_index[i] = index[j]
# `index` is always a tuple, so this is valid
j += 1
# Make sure `filled_index` had the same number of vacancies
# as the original SetProduct has factors. Not _strictly_ necessary.
assert j == len(index)
filled_index = tuple(filled_index)
if len(filled_index) == 1:
return filled_index[0]
else:
return filled_index
def _fill_indices_from_product(partial_index_list, product):
"""
`partial_index_list` is a list of indices, each corresponding to a
set. If an entry in `partial_index_list` is `_NotAnIndex`, that
slot will get filled in by an entry from `product`.
`product` is a `SetProduct` with as many "factors" as there are
missing indices in `partial_index_list`.
"""
# We will manipulate `normalize_index.flatten`.
# Store its original value so we can reset it when we're done.
_normalize_index_flatten = normalize_index.flatten
try:
normalize_index.flatten = False
for index in product:
# Since `normalize_index.flatten` is False, `index` is a
# scalar or (tuple of (scalars or tuples)). Conveniently,
# each entry in the tuple belongs to a single factor set.
# I do not have to worry about having a product-of-products
# here because I created the product from "unfactorable sets"
# We need to generate a new index for every entry of `product`,
# and want to reuse `partial_index_list` as a starting point,
# so we copy it here.
filled_index = list(partial_index_list)
normalize_index.flatten = _normalize_index_flatten
# filled_index can now be used in the user's intended way
# This determines how we will access the component's data
# with our new index, which is currently _completely_ unflattened
# (i.e. a tuple-of-tuples, no further nesting).
#
# This will not work well if the user's component is indexed by
# a nested product of sets AND normalize_index.flatten is False.
# In this case we may try to access a component's data with
# >>> comp[(1,'a',1)]
# (each coordinate belongs to its own set) when it expects
# >>> comp[((1,'a'),1)]
# because `comp` was created with two set arguments, the first
# of which was already a product.
yield (index, _fill_indices(filled_index, index))
normalize_index.flatten = False
# Want to get the unflattened factors when we advance the
# iterator of `product`
finally:
# Reset `normalize_index.flatten`
normalize_index.flatten = _normalize_index_flatten
[docs]
def slice_component_along_sets(component, sets, context_slice=None, normalize=None):
"""This function generates all possible slices of the provided component
along the provided sets. That is, it will iterate over the component's
other indexing sets and, for each index, yield a slice along the
sets specified in the call signature.
Arguments
---------
component: Component
The component whose slices will be yielded
sets: ComponentSet
ComponentSet of Pyomo sets that will be sliced along
context_slice: IndexedComponent_slice
If provided, instead of creating a new slice, we will extend this
one with appropriate getattr and getitem calls.
normalize: Bool
If False, the returned index (from the product of "other sets")
is not normalized, regardless of the value of normalize_index.flatten.
This is necessary to use this index with _fill_indices.
Yields
------
tuple
The first entry is the index in the product of "other sets"
corresponding to the slice, and the second entry is the slice
at that index.
"""
set_set = ComponentSet(sets)
subsets = list(component.index_set().subsets())
temp_idx = [get_slice_for_set(s) if s in set_set else _NotAnIndex for s in subsets]
other_sets = [s for s in subsets if s not in set_set]
if context_slice is None:
base_component = component
else:
base_component = getattr(context_slice, component.local_name)
if component.is_indexed():
# We need to iterate over sets that aren't sliced
# `c.is_indexed()` covers the case when UnindexedComponent_set
# is in `other_sets`.
if other_sets:
cross_prod = other_sets[0].cross(*other_sets[1:])
else:
# If we are only indexed by sets we need to slice, we
# should just use tuple(temp_idx) as our index. We spoof
# a cross_prod here so we don't have to repeat the try/except
# logic below in a separate branch. An empty tuple is the right
# singleton to work in the embedded call to _fill_indices.
cross_prod = [tuple()]
for prod_index, new_index in _fill_indices_from_product(temp_idx, cross_prod):
try:
if normalize_index.flatten:
# This index is always normalized if normalize_index.flatten
# is True. I have not encountered a situation where
# "denormalization" makes sense here.
# As normalization is also done in the IndexedComponent,
# normalizing here primarily just affects what the resulting
# slice "looks like." E.g. slice(None) vs (slice(None),).
# This has implications for generating CUIDs from these
# slices, where we would like consistency in the string
# representation.
# TODO: Should CUID normalize (slice(None),)?
new_index = normalize_index(new_index)
c_slice = base_component[new_index]
if type(c_slice) is IndexedComponent_slice:
# This is just to make sure we do not have an
# empty slice.
#
# Note that c_slice is not necessarily a slice.
# We enter this loop even if no sets need slicing.
try:
next(iter(c_slice.duplicate()))
except IndexError:
if normalize_index.flatten:
raise
# There is an edge case where when we are not
# flattening indices the dimensionality of an
# index can change between a SetProduct and the
# member Sets: the member set can have dimen>1
# (or even None!), but the dimen of that portion
# of the SetProduct is always 1. Since we are
# just checking that the c_slice isn't
# completely empty, we will allow matching with
# an Ellipsis
_empty = True
try:
next(iter(base_component[...]))
_empty = False
except:
pass
if _empty:
raise
if (normalize is None and normalize_index.flatten) or normalize:
# Most users probably want this index to be normalized,
# so they can more conveniently use it as a key in a
# mapping. (E.g. they will get "a" as opposed to ("a",).)
# However, to use it in the calling routine
# generate_sliced_components, we need this index to not
# have been normalized, so that indices are tuples,
# partitioned according to their "factor sets."
# This is why we allow the argument normalize=False to
# override normalize_index.flatten.
prod_index = normalize_index(prod_index)
yield prod_index, c_slice
except StopIteration:
# We have an empty slice for some reason, e.g.
# a coordinate of `new_index` from the cross
# product was skipped in the original component.
pass
except KeyError:
# We are creating scalar components from a product of
# sets. Components may be undefined for certain indices.
# We want to simply skip that index and move on.
pass
else:
# Component is a data object
c_slice = base_component
yield (), c_slice
[docs]
def generate_sliced_components(
b, index_stack, slice_, sets, ctype, index_map, active=None
):
"""Recursively generate slices of the specified ctype along the
specified sets
Parameters
----------
b: BlockData
Block whose components will be sliced
index_stack: list
Sets above ``b`` in the block hierarchy, including on its parent
component, that have been sliced. This is necessary to return the
sets that have been sliced.
slice_: IndexedComponent_slice or BlockData
Slice generated so far. This function will yield extensions to
this slice at the current level of the block hierarchy.
sets: ComponentSet of Pyomo sets
Sets that will be sliced
ctype: Subclass of Component
Type of components to generate
index_map: ComponentMap
Map from (some of) the specified sets to a "representative index"
to use when descending into subblocks. While this map does not need
to contain every set in the sliced sets, it must not contain any
sets that will not be sliced.
active: Bool or None
If not None, this is a boolean flag used to filter component objects
by their active status.
Yields
------
Tuple of Sets and an IndexedComponent_slice or ComponentData
The sets indexing the returned component or slice. If the component
is indexed, an IndexedComponent_slice is returned. Otherwise, a
ComponentData is returned.
"""
if type(slice_) is IndexedComponent_slice:
context_slice = slice_.duplicate()
else:
context_slice = None
# If active argument is specified and does not match the block's
# active flag, we return immediately. This matches the behavior of
# component_objects. We only need this check as we may modify the
# active argument sent to component_objects if ctype is not an
# ActiveComponent type.
if active is not None and active != b.active:
return
# Define this class so we don't have to call issubclass again later.
check_active = issubclass(ctype, ActiveComponent) and (active != None)
# If active=False and ctype is not an ActiveComponent (e.g. it is Var)
# we will not generate any components. To prevent this, only pass the
# active argument if we are looking for active components.
c_active = active if check_active else None
# Looks for components indexed by specified sets immediately in our block.
for c in b.component_objects(ctype, descend_into=False, active=c_active):
subsets = list(c.index_set().subsets())
new_sets = [s for s in subsets if s in sets]
# Extend our "index stack"
sliced_sets = index_stack + new_sets
# Extend our slice with this component
for idx, new_slice in slice_component_along_sets(
c, sets, context_slice=context_slice, normalize=False
):
# If we have to check activity, check data objects defined by
# slice. If any match, we yield the slice. This is done for
# compatibility with the behavior when slicing blocks, where
# we can only descend into a block that matches our active flag.
#
# Note that new_slice can be a data object. This happens if the
# component doesn't contain any sets we are slicing, i.e. new_sets
# is empty.
if (
# Yield if (a) we're not checking activity
not check_active
# or (b) we have not sliced and data object activity matches
or (not sliced_sets and new_slice.active == c_active)
# or (c) we did slice and *any* data object activity matches
or (
sliced_sets
and any(data.active == c_active for data in new_slice.duplicate())
)
):
yield sliced_sets, new_slice
# We now descend into subblocks
for sub in b.component_objects(Block, descend_into=False, active=active):
subsets = list(sub.index_set().subsets())
new_sets = [s for s in subsets if s in sets]
# Extend stack with new matched indices.
index_stack.extend(new_sets)
# Need to construct an index to descend into for each slice-of-block
# we are about generate.
# Note that any remaining _NotAnIndex placeholders after this loop
# will be replaced with the corresponding indices of the non-sliced
# sets.
given_descend_idx = [_NotAnIndex for _ in subsets]
for i, s in enumerate(subsets):
# NOTE: index_map better only contain sets that we are slicing.
if s in index_map:
# Use a user-given index if available.
given_descend_idx[i] = index_map[s]
if s not in sets:
raise RuntimeError(
"Encountered a specified index for a set %s that we"
" are not slicing. This is not supported" % s
)
elif s in sets:
# Otherwise use a slice. We will advanced the slice iter
# to try to get a concrete component from this slice.
given_descend_idx[i] = get_slice_for_set(s)
# Generate slices from this sub-block
for idx, new_slice in slice_component_along_sets(
sub, sets, context_slice=context_slice, normalize=False
):
# TODO: Can this branch happen outside of the loop?
# If it's not indexed, we don't need to slice...
if sub.is_indexed():
# fill any remaining placeholders with the "index" of our slice
descend_idx = _fill_indices(list(given_descend_idx), idx)
# create a slice-or-data object
descend_data = sub[descend_idx]
if type(descend_data) is IndexedComponent_slice:
try:
slice_iter = iter(descend_data)
# Try to find a data object defined by the slice
# that matches the active argument. In doing so,
# we treat a slice as inactive if all of its data
# objects are inactive. We need to find a data obj
# with the correct active flag, otherwise we run into
# problems when we descend (component_objects will
# not yield anything).
_data = next(slice_iter)
while active is not None and _data.active != active:
_data = next(slice_iter)
descend_data = _data
except StopIteration:
# For this particular idx, we have no BlockData
# to descend into.
continue
elif active is not None and descend_data.active != active:
# descend_data is a BlockData object. This particular
# BlockData was specified by the index map. In this case,
# we want to respect "activity".
continue
else:
# Have encountered a ScalarBlock. Do not need to check the
# active flag as this came straight from component_objects.
descend_data = sub
# Recursively generate sliced components from this data object
for st, v in generate_sliced_components(
descend_data,
index_stack,
new_slice,
sets,
ctype,
index_map,
active=active,
):
yield tuple(st), v
# pop the index sets of the block whose sub-components
# we just finished iterating over.
for _ in new_sets:
index_stack.pop()
[docs]
def flatten_components_along_sets(m, sets, ctype, indices=None, active=None):
"""This function iterates over components (recursively) contained
in a block and partitions their data objects into components
indexed only by the specified sets.
Parameters
----------
m: BlockData
Block whose components (and their sub-components) will be
partitioned
sets: Tuple of Pyomo Sets
Sets to be sliced. Returned components will be indexed by
some combination of these sets, if at all.
ctype: Subclass of Component
Type of component to identify and partition
indices: Iterable or ComponentMap
Indices of sets to use when descending into subblocks. If an
iterable is provided, the order corresponds to the order in
``sets``. If a ``ComponentMap`` is provided, the keys must be
in ``sets``.
active: Bool or None
If not None, this is a boolean flag used to filter component objects
by their active status. A reference-to-slice is returned if any data
object defined by the slice matches this flag.
Returns
-------
List of tuples of Sets, list of lists of Components
The first entry is a list of tuples of Pyomo Sets. The second is a
list of lists of Components, indexed by the corresponding sets in
the first list. If the components are unindexed, ComponentData are
returned and the tuple of sets contains only UnindexedComponent_set.
If the components are indexed, they are references-to-slices.
"""
set_of_sets = ComponentSet(sets)
if indices is None:
index_map = ComponentMap()
elif type(indices) is ComponentMap:
index_map = indices
else:
index_map = ComponentMap(zip(sets, indices))
for s, idx in index_map.items():
if idx not in s:
raise ValueError(
"%s is a bad index for set %s. \nPlease provide an index "
"that is in the set." % (idx, s.name)
)
if s not in set_of_sets:
raise RuntimeError(
"Index specified for set %s that is not one of the sets"
" that will be sliced. Indices should only be provided"
" for sets that will be sliced." % s.name
)
index_stack = []
# Using these two `OrderedDict`s is a workaround because I can't
# reliably use tuples of components as keys in a `ComponentMap`.
sets_dict = OrderedDict()
comps_dict = OrderedDict()
for index_sets, slice_ in generate_sliced_components(
m, index_stack, m, set_of_sets, ctype, index_map, active=active
):
# Note that index_sets should always be a tuple, never a scalar.
# TODO: Potentially re-order sets at this point.
# In this way (time, space) would have the same key as (space, time).
# They we'd have to somehow "swap indexing sets" when we create
# the reference below.
key = tuple(id(c) for c in index_sets)
if key not in sets_dict:
if len(key) == 0:
sets_dict[key] = (UnindexedComponent_set,)
else:
sets_dict[key] = index_sets
if key not in comps_dict:
comps_dict[key] = []
if len(key) == 0:
comps_dict[key].append(slice_)
else:
# If the user wants to change these flags, they can access the
# slice via the `referent` attribute of each reference component.
slice_.attribute_errors_generate_exceptions = False
slice_.key_errors_generate_exceptions = False
comps_dict[key].append(Reference(slice_))
# list-of-tuples of Sets:
sets_list = list(sets for sets in sets_dict.values())
# list-of-lists of components:
comps_list = list(comps for comps in comps_dict.values())
# E.g. we return: (
# [(time, space), (time,)],
# [[some_component, ...], [other, ...]],
# ) ^ These components are indexed by time
# ^ These components are indexed by time and space
return sets_list, comps_list
[docs]
def flatten_dae_components(model, time, ctype, indices=None, active=None):
"""Partitions components into ComponentData and Components indexed only
by the provided set.
Parameters
----------
model: BlockData
Block whose components are partitioned
time: Set
Indexing by this set (and only this set) will be preserved in the
returned components.
ctype: Subclass of Component
Type of component to identify, partition, and return
indices: Tuple or ComponentMap
Contains the index of the specified set to be used when descending
into blocks
active: Bool or None
If provided, used as a filter to only return components with the
specified active flag. A reference-to-slice is returned if any
data object defined by the slice matches this flag.
Returns
-------
List of ComponentData, list of Component
The first list contains ComponentData for all components not
indexed by the provided set. The second contains references-to
-slices for all components indexed by the provided set.
"""
target = ComponentSet((time,))
sets_list, comps_list = flatten_components_along_sets(
model, target, ctype, indices=indices, active=active
)
# Initialize these variables as, if no components of either category are
# found, we expect to get an empty list.
scalar_comps = []
dae_comps = []
for sets, comps in zip(sets_list, comps_list):
if len(sets) == 1 and sets[0] is time:
dae_comps = comps
elif len(sets) == 0 or (len(sets) == 1 and sets[0] is UnindexedComponent_set):
scalar_comps = comps
else:
raise RuntimeError(
"Invalid model for `flatten_dae_components`.\n"
"This can happen if your model has components that are\n"
"indexed by time (explicitly or implicitly) multiple times."
)
return scalar_comps, dae_comps