Source code for pyomo.dae.plugins.colloc

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import logging
import math

# If the user has numpy then the collocation points and the a matrix for
# the Runge-Kutta basis formulation will be calculated as needed.
# If the user does not have numpy then these values will be read from a
# stored dictionary for up to 10 collocation points.
from pyomo.common.dependencies import numpy, numpy_available

from pyomo.common.collections import ComponentSet

from pyomo.core.base import Transformation, TransformationFactory
from pyomo.core import Var, ConstraintList, Expression, Objective
from pyomo.dae import ContinuousSet, DerivativeVar, Integral

from pyomo.dae.misc import generate_finite_elements
from pyomo.dae.misc import generate_colloc_points
from pyomo.dae.misc import expand_components
from pyomo.dae.misc import create_partial_expression
from pyomo.dae.misc import add_discretization_equations
from pyomo.dae.misc import add_continuity_equations
from pyomo.dae.misc import block_fully_discretized
from pyomo.dae.misc import get_index_information
from pyomo.dae.diffvar import DAE_Error

from pyomo.common.config import ConfigBlock, ConfigValue, PositiveInt, In

logger = logging.getLogger('pyomo.dae')


def _lagrange_radau_transform(v, s):
    ncp = s.get_discretization_info()['ncp']
    adot = s.get_discretization_info()['adot']

    def _fun(i):
        tmp = list(s)
        idx = s.ord(i) - 1
        if idx == 0:  # Don't apply this equation at initial point
            raise IndexError("list index out of range")
        low = s.get_lower_element_boundary(i)
        lowidx = s.ord(low) - 1
        return sum(
            v(tmp[lowidx + j])
            * adot[j][idx - lowidx]
            * (1.0 / (tmp[lowidx + ncp] - tmp[lowidx]))
            for j in range(ncp + 1)
        )

    return _fun


def _lagrange_radau_transform_order2(v, s):
    ncp = s.get_discretization_info()['ncp']
    adotdot = s.get_discretization_info()['adotdot']

    def _fun(i):
        tmp = list(s)
        idx = s.ord(i) - 1
        if idx == 0:
            # Don't apply this equation at initial point
            raise IndexError("list index out of range")
        low = s.get_lower_element_boundary(i)
        lowidx = s.ord(low) - 1
        return sum(
            v(tmp[lowidx + j])
            * adotdot[j][idx - lowidx]
            * (1.0 / (tmp[lowidx + ncp] - tmp[lowidx]) ** 2)
            for j in range(ncp + 1)
        )

    return _fun


def _lagrange_legendre_transform(v, s):
    ncp = s.get_discretization_info()['ncp']
    adot = s.get_discretization_info()['adot']

    def _fun(i):
        tmp = list(s)
        idx = s.ord(i) - 1
        if idx == 0:
            # Don't apply this equation at initial point
            raise IndexError("list index out of range")
        elif i in s.get_finite_elements():
            # Don't apply at finite element points continuity equations
            # added later
            raise IndexError("list index out of range")
        low = s.get_lower_element_boundary(i)
        lowidx = s.ord(low) - 1
        return sum(
            v(tmp[lowidx + j])
            * adot[j][idx - lowidx]
            * (1.0 / (tmp[lowidx + ncp + 1] - tmp[lowidx]))
            for j in range(ncp + 1)
        )

    return _fun


def _lagrange_legendre_transform_order2(v, s):
    ncp = s.get_discretization_info()['ncp']
    adotdot = s.get_discretization_info()['adotdot']

    def _fun(i):
        tmp = list(s)
        idx = s.ord(i) - 1
        if idx == 0:
            # Don't apply this equation at initial point
            raise IndexError("list index out of range")
        elif i in s.get_finite_elements():
            # Don't apply at finite element points continuity equations
            # added later
            raise IndexError("list index out of range")
        low = s.get_lower_element_boundary(i)
        lowidx = s.ord(low) - 1
        return sum(
            v(tmp[lowidx + j])
            * adotdot[j][idx - lowidx]
            * (1.0 / (tmp[lowidx + ncp + 1] - tmp[lowidx]) ** 2)
            for j in range(ncp + 1)
        )

    return _fun


def conv(a, b):
    if len(a) == 0 or len(b) == 0:
        raise ValueError("Cannot convolve an empty list")

    ans = []
    m = len(a)
    n = len(b)

    for k in range(m + n - 1):
        val = 0
        j = max(0, k - n)
        stop = min(k, m)
        while j <= stop:
            if j < m and (k - j) < n:
                val += a[j] * b[k - j]
            j += 1
        ans.insert(k, val)

    return ans


def calc_cp(alpha, beta, k):
    gamma = []
    factorial = math.factorial

    for i in range(k + 1):
        num = factorial(alpha + k) * factorial(alpha + beta + k + i)
        denom = factorial(alpha + i) * factorial(k - i) * factorial(i)
        gamma.insert(i, num / denom)

    poly = []
    for i in range(k + 1):
        if i == 0:
            poly.insert(i, gamma[i])
        else:
            prod = [1]
            j = 1
            while j <= i:
                prod = conv(prod, [1, -1])
                j += 1
            while len(poly) < len(prod):
                poly.insert(0, 0)
            prod = [gamma[i] * t for t in prod]
            poly = [sum(pair) for pair in zip(poly, prod)]

    cp = numpy.roots(poly)
    return numpy.sort(cp).tolist()


# BLN: This is a legacy function that was used to calculate the collocation
# constants for an alternative form of the collocation equations described
# in Biegler's nonlinear programming book. The difference being whether the
# state or the derivative is approximated using lagrange polynomials. With
# the addition of PDE support and chained discretizations in Pyomo.DAE 2.0
# this function is no longer used but kept here for future reference.
#
# def calc_omega(cp):
#     a = []
#     for i in range(len(cp)):
#         ptmp = []
#         tmp = 0
#         for j in range(len(cp)):
#             if j != i:
#                 row = []
#                 row.insert(0, 1 / (cp[i] - cp[j]))
#                 row.insert(1, -cp[j] / (cp[i] - cp[j]))
#                 ptmp.insert(tmp, row)
#                 tmp += 1
#         p = [1]
#         for j in range(len(cp) - 1):
#             p = conv(p, ptmp[j])
#         pint = numpy.polyint(p)
#         arow = []
#         for j in range(len(cp)):
#             arow.append(numpy.polyval(pint, cp[j]))
#         a.append(arow)
#     return a


def calc_adot(cp, order=1):
    a = []
    for i in range(len(cp)):
        ptmp = []
        tmp = 0
        for j in range(len(cp)):
            if j != i:
                row = []
                row.insert(0, 1 / (cp[i] - cp[j]))
                row.insert(1, -cp[j] / (cp[i] - cp[j]))
                ptmp.insert(tmp, row)
                tmp += 1
        p = [1]
        for j in range(len(cp) - 1):
            p = conv(p, ptmp[j])
        pder = numpy.polyder(p, order)
        arow = []
        for j in range(len(cp)):
            arow.append(float(numpy.polyval(pder, cp[j])))
        a.append(arow)
    return a


def calc_afinal(cp):
    afinal = []
    for i in range(len(cp)):
        ptmp = []
        tmp = 0
        for j in range(len(cp)):
            if j != i:
                row = []
                row.insert(0, 1 / (cp[i] - cp[j]))
                row.insert(1, -cp[j] / (cp[i] - cp[j]))
                ptmp.insert(tmp, row)
                tmp += 1
        p = [1]
        for j in range(len(cp) - 1):
            p = conv(p, ptmp[j])
        afinal.append(float(numpy.polyval(p, 1.0)))
    return afinal


[docs]@TransformationFactory.register( 'dae.collocation', doc="Discretizes a DAE model using orthogonal collocation over" " finite elements transforming the model into an NLP.", ) class Collocation_Discretization_Transformation(Transformation): CONFIG = ConfigBlock("dae.collocation") CONFIG.declare( 'nfe', ConfigValue( default=10, domain=PositiveInt, description="The desired number of finite element points to be " "included in the discretization", ), ) CONFIG.declare( 'ncp', ConfigValue( default=3, domain=PositiveInt, description="The desired number of collocation points over each " "finite element", ), ) CONFIG.declare( 'wrt', ConfigValue( default=None, description="The ContinuousSet to be discretized", doc="Indicates which ContinuousSet the transformation should be " "applied to. If this keyword argument is not specified then the " "same scheme will be applied to all ContinuousSets.", ), ) CONFIG.declare( 'scheme', ConfigValue( default='LAGRANGE-RADAU', domain=In(['LAGRANGE-RADAU', 'LAGRANGE-LEGENDRE']), description="Indicates which collocation scheme to apply", doc="Options are 'LAGRANGE-RADAU' and 'LAGRANGE-LEGENDRE'. " "The default scheme is Lagrange polynomials with Radau roots", ), ) def __init__(self): super(Collocation_Discretization_Transformation, self).__init__() self._ncp = {} self._nfe = {} self._adot = {} self._adotdot = {} self._afinal = {} self._tau = {} self._reduced_cp = {} self.all_schemes = { 'LAGRANGE-RADAU': ( _lagrange_radau_transform, _lagrange_radau_transform_order2, ), 'LAGRANGE-LEGENDRE': ( _lagrange_legendre_transform, _lagrange_legendre_transform_order2, ), } def _get_radau_constants(self, currentds): """ This function sets the radau collocation points and a values depending on how many collocation points have been specified and whether or not the user has numpy """ if not numpy_available: if self._ncp[currentds] > 10: raise ValueError( "Numpy was not found so the maximum number " "of collocation points is 10" ) from pyomo.dae.utilities import ( radau_tau_dict, radau_adot_dict, radau_adotdot_dict, ) self._tau[currentds] = radau_tau_dict[self._ncp[currentds]] self._adot[currentds] = radau_adot_dict[self._ncp[currentds]] self._adotdot[currentds] = radau_adotdot_dict[self._ncp[currentds]] self._afinal[currentds] = None else: alpha = 1 beta = 0 k = self._ncp[currentds] - 1 cp = calc_cp(alpha, beta, k) cp.insert(0, 0.0) cp.append(1.0) adot = calc_adot(cp, 1) adotdot = calc_adot(cp, 2) self._tau[currentds] = cp self._adot[currentds] = adot self._adotdot[currentds] = adotdot self._afinal[currentds] = None def _get_legendre_constants(self, currentds): """ This function sets the legendre collocation points and a values depending on how many collocation points have been specified and whether or not the user has numpy """ if not numpy_available: if self._ncp[currentds] > 10: raise ValueError( "Numpy was not found so the maximum number " "of collocation points is 10" ) from pyomo.dae.utilities import ( legendre_tau_dict, legendre_adot_dict, legendre_adotdot_dict, legendre_afinal_dict, ) self._tau[currentds] = legendre_tau_dict[self._ncp[currentds]] self._adot[currentds] = legendre_adot_dict[self._ncp[currentds]] self._adotdot[currentds] = legendre_adotdot_dict[self._ncp[currentds]] self._afinal[currentds] = legendre_afinal_dict[self._ncp[currentds]] else: alpha = 0 beta = 0 k = self._ncp[currentds] cp = calc_cp(alpha, beta, k) cp.insert(0, 0.0) adot = calc_adot(cp, 1) adotdot = calc_adot(cp, 2) afinal = calc_afinal(cp) self._tau[currentds] = cp self._adot[currentds] = adot self._adotdot[currentds] = adotdot self._afinal[currentds] = afinal def _apply_to(self, instance, **kwds): """ Applies specified collocation transformation to a modeling instance Keyword Arguments: nfe The desired number of finite element points to be included in the discretization. ncp The desired number of collocation points over each finite element. wrt Indicates which ContinuousSet the transformation should be applied to. If this keyword argument is not specified then the same scheme will be applied to all ContinuousSets. scheme Indicates which collocation scheme to apply. Options are 'LAGRANGE-RADAU' and 'LAGRANGE-LEGENDRE'. The default scheme is Lagrange polynomials with Radau roots. """ config = self.CONFIG(kwds) tmpnfe = config.nfe tmpncp = config.ncp tmpds = config.wrt if tmpds is not None: if tmpds.ctype is not ContinuousSet: raise TypeError( "The component specified using the 'wrt' " "keyword must be a continuous set" ) elif 'scheme' in tmpds.get_discretization_info(): raise ValueError( "The discretization scheme '%s' has already " "been applied to the ContinuousSet '%s'" % (tmpds.get_discretization_info()['scheme'], tmpds.name) ) if None in self._nfe: raise ValueError( "A general discretization scheme has already been applied to " "to every ContinuousSet in the model. If you would like to " "specify a specific discretization scheme for one of the " "ContinuousSets you must discretize each ContinuousSet " "separately." ) if len(self._nfe) == 0 and tmpds is None: # Same discretization on all ContinuousSets self._nfe[None] = tmpnfe self._ncp[None] = tmpncp currentds = None else: self._nfe[tmpds.name] = tmpnfe self._ncp[tmpds.name] = tmpncp currentds = tmpds.name self._scheme_name = config.scheme self._scheme = self.all_schemes.get(self._scheme_name, None) if self._scheme_name == 'LAGRANGE-RADAU': self._get_radau_constants(currentds) elif self._scheme_name == 'LAGRANGE-LEGENDRE': self._get_legendre_constants(currentds) self._transformBlock(instance, currentds) def _transformBlock(self, block, currentds): self._fe = {} for ds in block.component_objects(ContinuousSet, descend_into=True): if currentds is None or currentds == ds.name: if 'scheme' in ds.get_discretization_info(): raise DAE_Error( "Attempting to discretize ContinuousSet " "'%s' after it has already been discretized. " % ds.name ) generate_finite_elements(ds, self._nfe[currentds]) if not ds.get_changed(): if len(ds) - 1 > self._nfe[currentds]: logger.warning( "More finite elements were found in " "ContinuousSet '%s' than the number of " "finite elements specified in apply. The " "larger number of finite elements will be " "used." % ds.name ) self._nfe[ds.name] = len(ds) - 1 self._fe[ds.name] = list(ds) generate_colloc_points(ds, self._tau[currentds]) # Adding discretization information to the continuousset # object itself so that it can be accessed outside of the # discretization object disc_info = ds.get_discretization_info() disc_info['nfe'] = self._nfe[ds.name] disc_info['ncp'] = self._ncp[currentds] disc_info['tau_points'] = self._tau[currentds] disc_info['adot'] = self._adot[currentds] disc_info['adotdot'] = self._adotdot[currentds] disc_info['afinal'] = self._afinal[currentds] disc_info['scheme'] = self._scheme_name expand_components(block) for d in block.component_objects(DerivativeVar, descend_into=True): dsets = d.get_continuousset_list() for i in ComponentSet(dsets): if currentds is None or i.name == currentds: oldexpr = d.get_derivative_expression() loc = d.get_state_var()._contset[i] count = dsets.count(i) if count >= 3: raise DAE_Error( "Error discretizing '%s' with respect to '%s'. " "Current implementation only allows for taking the" " first or second derivative with respect to a " "particular ContinuousSet" % (d.name, i.name) ) scheme = self._scheme[count - 1] newexpr = create_partial_expression(scheme, oldexpr, i, loc) d.set_derivative_expression(newexpr) if self._scheme_name == 'LAGRANGE-LEGENDRE': # Add continuity equations to DerivativeVar's parent # block add_continuity_equations(d.parent_block(), d, i, loc) # Reclassify DerivativeVar if all indexing ContinuousSets have # been discretized. Add discretization equations to the # DerivativeVar's parent block. if d.is_fully_discretized(): add_discretization_equations(d.parent_block(), d) d.parent_block().reclassify_component_type(d, Var) # Keep track of any reclassified DerivativeVar components so # that the Simulator can easily identify them if the model # is simulated after discretization # TODO: Update the discretization transformations to use # a Block to add things to the model and store discretization # information. Using a list for now because the simulator # does not yet support models containing active Blocks reclassified_list = getattr( block, '_pyomo_dae_reclassified_derivativevars', None ) if reclassified_list is None: block._pyomo_dae_reclassified_derivativevars = list() reclassified_list = block._pyomo_dae_reclassified_derivativevars reclassified_list.append(d) # Reclassify Integrals if all ContinuousSets have been discretized if block_fully_discretized(block): if block.contains_component(Integral): for i in block.component_objects(Integral, descend_into=True): i.parent_block().reclassify_component_type(i, Expression) # TODO: The following reproduces the old behavior of # "reconstruct()". We should come up with an # implementation that does not rely on manipulating # private attributes i.clear() i._constructed = False i.construct() # If a model contains integrals they are most likely to appear # in the objective function which will need to be reconstructed # after the model is discretized. for k in block.component_objects(Objective, descend_into=True): # TODO: check this, reconstruct might not work # TODO: The following reproduces the old behavior of # "reconstruct()". We should come up with an # implementation that does not rely on manipulating # private attributes k.clear() k._constructed = False k.construct()
[docs] def reduce_collocation_points(self, instance, var=None, ncp=None, contset=None): """ This method will add additional constraints to a model to reduce the number of free collocation points (degrees of freedom) for a particular variable. Parameters ---------- instance : Pyomo model The discretized Pyomo model to add constraints to var : ``pyomo.environ.Var`` The Pyomo variable for which the degrees of freedom will be reduced ncp : int The new number of free collocation points for `var`. Must be less that the number of collocation points used in discretizing the model. contset : ``pyomo.dae.ContinuousSet`` The :py:class:`ContinuousSet<pyomo.dae.ContinuousSet>` that was discretized and for which the `var` will have a reduced number of degrees of freedom """ if contset is None: raise TypeError( "A continuous set must be specified using the keyword 'contset'" ) if contset.ctype is not ContinuousSet: raise TypeError( "The component specified using the 'contset' " "keyword must be a ContinuousSet" ) ds = contset if len(self._ncp) == 0: raise RuntimeError( "This method should only be called after using " "the apply() method to discretize the model" ) elif None in self._ncp: tot_ncp = self._ncp[None] elif ds.name in self._ncp: tot_ncp = self._ncp[ds.name] else: raise ValueError( "ContinuousSet '%s' has not been discretized, " "please call the apply_to() method with this " "ContinuousSet to discretize it before calling " "this method" % ds.name ) if var is None: raise TypeError("A variable must be specified") if var.ctype is not Var: raise TypeError( "The component specified using the 'var' keyword must be a variable" ) if ncp is None: raise TypeError("The number of collocation points must be specified") if ncp <= 0: raise ValueError("The number of collocation points must be at least 1") if ncp > tot_ncp: raise ValueError( "The number of collocation points used to " "interpolate an individual variable must be less " "than the number used to discretize the original " "model" ) if ncp == tot_ncp: # Nothing to be done return instance # Check to see if the continuousset is an indexing set of the variable if var.dim() == 0: raise IndexError( "ContinuousSet '%s' is not an indexing set of" " the variable '%s'" % (ds.name, var.name) ) varidx = var.index_set() if not varidx.subsets(): if ds is not varidx: raise IndexError( "ContinuousSet '%s' is not an indexing set of" " the variable '%s'" % (ds.name, var.name) ) elif ds not in varidx.subsets(): raise IndexError( "ContinuousSet '%s' is not an indexing set of the" " variable '%s'" % (ds.name, var.name) ) if var.name in self._reduced_cp: temp = self._reduced_cp[var.name] if ds.name in temp: raise RuntimeError( "Variable '%s' has already been constrained" " to a reduced number of collocation points" " over ContinuousSet '%s'." ) else: temp[ds.name] = ncp else: self._reduced_cp[var.name] = {ds.name: ncp} # TODO: Use unique_component_name for this list_name = var.local_name + "_interpolation_constraints" instance.add_component(list_name, ConstraintList()) conlist = instance.find_component(list_name) t = list(ds) fe = ds._fe info = get_index_information(var, ds) tmpidx = info['non_ds'] idx = info['index function'] # Iterate over non_ds indices for n in tmpidx: # Iterate over finite elements for i in range(0, len(fe) - 1): # Iterate over collocation points for k in range(1, tot_ncp - ncp + 1): if ncp == 1: # Constant over each finite element conlist.add(var[idx(n, i, k)] == var[idx(n, i, tot_ncp)]) else: tmp = ds.ord(fe[i]) - 1 tmp2 = ds.ord(fe[i + 1]) - 1 ti = t[tmp + k] tfit = t[tmp2 - ncp + 1 : tmp2 + 1] coeff = self._interpolation_coeffs(ti, tfit) conlist.add( var[idx(n, i, k)] == sum( var[idx(n, i, j)] * next(coeff) for j in range(tot_ncp - ncp + 1, tot_ncp + 1) ) ) return instance
def _interpolation_coeffs(self, ti, tfit): for i in tfit: l = 1 for j in tfit: if i != j: l = l * (ti - j) / (i - j) yield l