# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
import logging
from pyomo.common.collections import ComponentMap
from pyomo.common.log import LoggingIntercept
from pyomo.core import Suffix, Var, Constraint, Piecewise, Block
from pyomo.core import Expression, Param
from pyomo.core.base.misc import apply_indexed_rule
from pyomo.core.base.block import IndexedBlock, SortComponents
from pyomo.dae import ContinuousSet, DAE_Error
from pyomo.common.formatting import tostr
from io import StringIO
logger = logging.getLogger('pyomo.dae')
[docs]
def generate_finite_elements(ds, nfe):
"""
This function first checks to see if the number of finite elements
in the differential set is equal to nfe. If the number of finite
elements is less than nfe, additional points will be generated. If
the number of finite elements is greater than or equal to nfe the
differential set will not be modified
"""
if (len(ds) - 1) >= nfe:
# In this case the differentialset already contains the
# desired number or more than the desired number of finite
# elements so no additional points are needed.
return
elif len(ds) == 2:
# If only bounds have been specified on the differentialset we
# generate the desired number of finite elements by
# spreading them evenly over the interval
step = (max(ds) - min(ds)) / float(nfe)
tmp = min(ds) + step
while round(tmp, 6) <= round((max(ds) - step), 6):
ds.add(round(tmp, 6))
tmp += step
ds.set_changed(True)
ds._fe = list(ds)
return
else:
# This is the case where some points have been specified
# inside of the bounds however the desired number of finite
# elements has not been met. We first look at the step sizes
# between the existing points. Then an additional point
# is placed at the midpoint of the largest step. This
# process is repeated until we have achieved the desired
# number of finite elements. If there are multiple "largest steps"
# the point will be placed at the first occurrence of the
# largest step
addpts = nfe - (len(ds) - 1)
while addpts > 0:
_add_point(ds)
addpts -= 1
ds.set_changed(True)
ds._fe = list(ds)
return
def _add_point(ds):
sortds = list(ds)
maxstep = sortds[1] - sortds[0]
maxloc = 0
for i in range(2, len(sortds)):
if (sortds[i] - sortds[i - 1]) > maxstep:
maxstep = sortds[i] - sortds[i - 1]
maxloc = i - 1
ds.add(round((sortds[maxloc] + maxstep / 2.0), 6))
[docs]
def generate_colloc_points(ds, tau):
"""
This function adds collocation points between the finite elements
in the differential set
"""
fes = list(ds)
for i in range(1, len(fes)):
h = fes[i] - fes[i - 1]
for j in range(len(tau)):
if tau[j] == 1 or tau[j] == 0:
continue
pt = fes[i - 1] + h * tau[j]
pt = round(pt, 6)
if pt not in ds:
ds.add(pt)
ds.set_changed(True)
[docs]
def expand_components(block):
"""
Loop over block components and try expanding them. If expansion fails
then save the component and try again later. This function has some
built-in robustness for block-hierarchical models with circular
references but will not work for all cases.
"""
# expansion_map is used to map components to the functions used to
# expand them so that the update_contset_indexed_component function
# logic only has to be called once even in the case where we have to
# re-try expanding components due to circular references
expansion_map = ComponentMap()
redo_expansion = list()
# Record the missing BlockData before expanding components. This is for
# the case where a ContinuousSet indexed Block is used in a Constraint.
# If the Constraint is expanded before the Block then the missing
# BlockData will be added to the indexed Block but will not be
# constructed correctly.
for blk in block.component_objects(Block, descend_into=True):
missing_idx = set(blk.index_set()) - set(blk._data.keys())
if missing_idx:
blk._dae_missing_idx = missing_idx
# Wrap this whole process in a try block in order to ensure that errors
# swallowed by the LoggingIntercept context below are re-raised if the
# discretization encounters an error it isn't expecting.
try:
# Intercept logging to suppress Error messages arising from failed
# constraint rules. These error messages get logged even though the
# AttributeError causing the error is caught and handled by this
# function when expanding discretized models. We maintain a stream
# of the intercepted logging messages which will be printed if an
# unexpected exception is raised.
buf = StringIO()
with LoggingIntercept(buf, 'pyomo.core', logging.ERROR):
# Identify components that need to be expanded and try expanding
# them
for c in block.component_objects(
descend_into=True, sort=SortComponents.declOrder
):
try:
update_contset_indexed_component(c, expansion_map)
except AttributeError:
redo_expansion.append(c)
# Re-try expansion on any components that failed the first time.
# This is indicative of circular component references and not
# expanding components in the correct order the first time
# through.
N = len(redo_expansion)
while N:
for i in range(N):
c = redo_expansion.pop()
try:
expansion_map[c](c)
except AttributeError:
redo_expansion.append(c)
if len(redo_expansion) == N:
raise DAE_Error(
"Unable to fully discretize %s. Possible "
"circular references detected between "
"components %s. Reformulate your model to"
" remove circular references or apply a "
"discretization transformation before "
"linking blocks together." % (block, tostr(redo_expansion))
)
N = len(redo_expansion)
except Exception:
logger.error(buf.getvalue())
raise
[docs]
def update_contset_indexed_component(comp, expansion_map):
"""
Update any model components which are indexed by a ContinuousSet that
has changed
"""
# This implementation will *NOT* check for or update
# components which use a ContinuousSet implicitly. ex) an
# objective function which iterates through a ContinuousSet and
# sums the squared error. If you use a ContinuousSet implicitly
# you must initialize it with every index you would like to have
# access to!
if comp.ctype is Suffix:
return
# Params indexed by a ContinuousSet should include an initialize
# and/or default rule which will be called automatically when the
# parameter value at a new point in the ContinuousSet is
# requested. Therefore, no special processing is required for
# Params.
if comp.ctype is Param:
return
# Integral components are handled after every ContinuousSet has been
# discretized. Import is deferred to here due to circular references.
from pyomo.dae import Integral
if comp.ctype is Integral:
return
# Skip components that do not have a 'dim' attribute. This assumes that
# all components that could be indexed by a ContinuousSet have the 'dim'
# attribute
if not hasattr(comp, 'dim'):
return
# Components indexed by a ContinuousSet must have a dimension of at
# least 1
if comp.dim() == 0:
return
# Extract the indexing sets. Must treat components with a single
# index separately from components with multiple indexing sets.
temp = comp.index_set()
indexset = list(comp.index_set().subsets())
for s in indexset:
if s.ctype == ContinuousSet and s.get_changed():
if isinstance(comp, Var): # Don't use the type() method here
# because we want to catch DerivativeVar components as well
# as Var components
expansion_map[comp] = _update_var
_update_var(comp)
elif comp.ctype == Constraint:
expansion_map[comp] = _update_constraint
_update_constraint(comp)
elif comp.ctype == Expression:
expansion_map[comp] = _update_expression
_update_expression(comp)
elif isinstance(comp, Piecewise):
expansion_map[comp] = _update_piecewise
_update_piecewise(comp)
elif comp.ctype == Block:
expansion_map[comp] = _update_block
_update_block(comp)
else:
raise TypeError(
"Found component %s of type %s indexed "
"by a ContinuousSet. Components of this type are "
"not currently supported by the automatic "
"discretization transformation in pyomo.dae. "
"Try adding the component to the model "
"after discretizing. Alert the pyomo developers "
"for more assistance." % (str(comp), comp.ctype)
)
def _update_var(v):
"""
This method will construct any additional indices in a variable
resulting from the discretization of a ContinuousSet.
"""
# Note: This is not required it is handled by the _default method on
# Var (which is now a IndexedComponent). However, it
# would be much slower to rely on that method to generate new
# VarData for a large number of new indices.
new_indices = set(v.index_set()) - set(v._data.keys())
for index in new_indices:
v.add(index)
def _update_constraint(con):
"""
This method will construct any additional indices in a constraint
resulting from the discretization of a ContinuousSet.
"""
con.to_dense_data()
def _update_expression(expre):
"""
This method will construct any additional indices in an expression
resulting from the discretization of a ContinuousSet.
"""
expre.to_dense_data()
def _update_block(blk):
"""
This method will construct any additional indices in a block
resulting from the discretization of a ContinuousSet. For
Block-derived components we check if the Block construct method has
been overridden. If not then we update it like a regular block. If
construct has been overridden then we try to call the component's
update_after_discretization method. If the component hasn't
implemented this method then we throw a warning and try to update it
like a normal block. The issue, when construct is overridden, is that
anything could be happening and we can't automatically assume that
treating the block-derived component like a normal block will be
sufficient to update it correctly.
"""
# Check if Block construct method is overridden
# getattr needed below for Python 2, 3 compatibility
if blk.construct.__func__ is not getattr(
IndexedBlock.construct, '__func__', IndexedBlock.construct
):
# check for custom update function
if hasattr(blk, 'update_after_discretization'):
blk.update_after_discretization()
return
else:
logger.warning(
'DAE(misc): Attempting to apply a discretization '
'transformation to the Block-derived component "%s". The '
'component overrides the Block construct method but no '
'update_after_discretization() function was found. Will '
'attempt to update as a standard Block but user should verify '
'that the component was expanded correctly. To suppress this '
'warning, please provide an update_after_discretization() '
'function on Block-derived components that override '
'construct()' % blk.name
)
missing_idx = getattr(blk, '_dae_missing_idx', set([]))
for idx in list(missing_idx):
# Trigger block creation (including calling the Block's rule)
blk[idx]
# Remove book-keeping data after Block is discretized
if hasattr(blk, '_dae_missing_idx'):
del blk._dae_missing_idx
def _update_piecewise(pw):
"""
This method will construct any additional indices in a Piecewise
object resulting from the discretization of a ContinuousSet.
"""
pw._constructed = False
pw.construct()
[docs]
def create_access_function(var):
"""
This method returns a function that returns a component by calling
it rather than indexing it
"""
def _fun(*args):
return var[args]
return _fun
[docs]
def create_partial_expression(scheme, expr, ind, loc):
"""
This method returns a function which applies a discretization scheme
to an expression along a particular indexing set. This is admittedly a
convoluted looking implementation. The idea is that we only apply a
discretization scheme to one indexing set at a time but we also want
the function to be expanded over any other indexing sets.
"""
def _fun(*args):
return scheme(lambda i: expr(*(args[0:loc] + (i,) + args[loc + 1 :])), ind)
return lambda *args: _fun(*args)(args[loc])
[docs]
def add_discretization_equations(block, d):
"""
Adds the discretization equations for DerivativeVar d to the Block block.
Because certain indices will be valid for some discretization schemes and
not others, we skip any constraints which raise an IndexError.
"""
def _disc_eq(m, *args):
try:
return d[args] == d._expr(*args)
except IndexError:
return Constraint.Skip
block.add_component(
d.local_name + '_disc_eq', Constraint(d.index_set(), rule=_disc_eq)
)
[docs]
def add_continuity_equations(block, d, i, loc):
"""
Adds continuity equations in the case that the polynomial basis function
does not have a root at the finite element boundary
"""
svar = d.get_state_var()
nme = svar.local_name + '_' + i.local_name + '_cont_eq'
if block.find_component(nme) is not None:
return
def _cont_exp(v, s):
ncp = s.get_discretization_info()['ncp']
afinal = s.get_discretization_info()['afinal']
def _fun(i):
tmp = list(s)
idx = s.ord(i) - 1
low = s.get_lower_element_boundary(i)
if i != low or idx == 0:
raise IndexError("list index out of range")
low = s.get_lower_element_boundary(tmp[idx - 1])
lowidx = s.ord(low) - 1
return sum(v(tmp[lowidx + j]) * afinal[j] for j in range(ncp + 1))
return _fun
expr = create_partial_expression(_cont_exp, create_access_function(svar), i, loc)
def _cont_eq(m, *args):
try:
return svar[args] == expr(*args)
except IndexError:
return Constraint.Skip
block.add_component(nme, Constraint(d.index_set(), rule=_cont_eq))
[docs]
def block_fully_discretized(b):
"""
Checks to see if all ContinuousSets in a block have been discretized
"""
for i in b.component_map(ContinuousSet).values():
if 'scheme' not in i.get_discretization_info():
return False
return True
def _get_idx(l, ds, n, i, k):
"""
This function returns the appropriate index for a variable
indexed by a differential set. It's needed because the collocation
constraints are indexed by finite element and collocation point
however a ContinuousSet contains a list of all the discretization
points and is not separated into finite elements and collocation
points.
"""
t = list(ds)
tmp = ds.ord(ds._fe[i]) - 1
tik = t[tmp + k]
if n is None:
return tik
else:
tmpn = n
if not isinstance(n, tuple):
tmpn = (n,)
return tmpn[0:l] + (tik,) + tmpn[l:]