Source code for pyomo.contrib.mpc.data.convert

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

from collections.abc import MutableMapping
from pyomo.contrib.mpc.data.dynamic_data_base import _is_iterable, _DynamicDataBase
from pyomo.contrib.mpc.data.scalar_data import ScalarData
from pyomo.contrib.mpc.data.series_data import TimeSeriesData
from pyomo.contrib.mpc.data.interval_data import IntervalData
from pyomo.contrib.mpc.data.find_nearest_index import find_nearest_interval_index


def _process_to_dynamic_data(data, time_set=None):
    """Processes a user's data to convert it to the appropriate type
    of dynamic data

    Mappings are converted to ScalarData, and length-two tuples are converted
    to TimeSeriesData or IntervalData, depending on the contents of the
    second item (the list of time points or intervals).

    Arguments
    ---------
    data: Dict, ComponentMap, or Tuple
        Data to convert to either ScalarData, TimeSeriesData, or
        IntervalData, depending on type.

    Returns
    -------
    ScalarData, TimeSeriesData, or IntervalData

    """
    if isinstance(data, _DynamicDataBase):
        return data
    if isinstance(data, MutableMapping):
        return ScalarData(data, time_set=time_set)
    elif isinstance(data, tuple):
        if len(data) != 2:
            raise TypeError(
                "_process_to_dynamic_data only accepts a mapping or a"
                " tuple of length two. Got tuple of length %s" % len(data)
            )
        if not isinstance(data[0], MutableMapping):
            raise TypeError(
                "First entry of data tuple must be instance of MutableMapping,"
                "e.g. dict or ComponentMap. Got %s" % type(data[0])
            )
        elif len(data[1]) == 0:
            raise ValueError(
                "Time sequence provided in data tuple is empty."
                " Cannot infer whether this is a list of points or intervals."
            )
        elif all(not _is_iterable(item) for item in data[1]):
            return TimeSeriesData(*data)
        elif all(_is_iterable(item) and len(item) == 2 for item in data[1]):
            return IntervalData(*data)
        else:
            raise TypeError(
                "Second entry of data tuple must be a non-empty iterable of"
                " scalars (time points) or length-two tuples (intervals)."
                " Got %s" % str(data[1])
            )


[docs]def interval_to_series( data, time_points=None, tolerance=0.0, use_left_endpoints=False, prefer_left=True ): """ Arguments --------- data: IntervalData Data to convert to a TimeSeriesData object time_points: Iterable (optional) Points at which time series will be defined. Values are taken from the interval in which each point lives. The default is to use the right endpoint of each interval. tolerance: Float (optional) Tolerance within which time points are considered equal. Default is zero. use_left_endpoints: Bool (optional) Whether the left endpoints should be used in the case when time_points is not provided. Default is False, meaning that the right interval endpoints will be used. Should not be set if time points are provided. prefer_left: Bool (optional) If time_points is provided, and a time point is equal (within tolerance) to a boundary between two intervals, this flag controls which interval is used. Returns ------- TimeSeriesData """ if time_points is None: # TODO: Should first or last data points of first or last # intervals be included? if use_left_endpoints: time_points = [t for t, _ in data.get_intervals()] else: time_points = [t for _, t in data.get_intervals()] series_data = data.get_data() # TODO: Should TimeSeriesData be constructed with the original time set? return TimeSeriesData(series_data, time_points) if use_left_endpoints: raise RuntimeError("Cannot provide time_points with use_left_endpoints=True") intervals = data.get_intervals() data_dict = data.get_data() # NOTE: This implementation is O(len(time_points)*log(len(intervals))). # Could potentially do better with an O(len(time_points) + len(intervals)) # implementation. idx_list = [ find_nearest_interval_index( intervals, t, tolerance=tolerance, prefer_left=prefer_left ) for t in time_points ] for i, t in enumerate(time_points): if idx_list[i] is None: raise RuntimeError( "Time point %s cannot be found in intervals within" " tolerance %s." % (t, tolerance) ) new_data = {key: [vals[i] for i in idx_list] for key, vals in data_dict.items()} # TODO: Should TimeSeriesData be constructed with the original time set? return TimeSeriesData(new_data, time_points)
[docs]def series_to_interval(data, use_left_endpoints=False): """ Arguments --------- data: TimeSeriesData Data that will be converted into an IntervalData object use_left_endpoints: Bool (optional) Flag indicating whether values on intervals should come from the values at the left or right endpoints of the intervals Returns ------- IntervalData """ time = data.get_time_points() data_dict = data.get_data() n_t = len(time) if n_t == 1: t0 = time[0] # TODO: Copy data dict? # TODO: Should we raise an error if time list has length one? return IntervalData(data_dict, [(t0, t0)]) else: # This covers the case of n_t > 1 and n_t == 0 new_data = {} intervals = [(time[i - 1], time[i]) for i in range(1, n_t)] for key, values in data_dict.items(): interval_values = [ values[i - 1] if use_left_endpoints else values[i] for i in range(1, n_t) ] new_data[key] = interval_values return IntervalData(new_data, intervals)