Source code for pyomo.contrib.cp.scheduling_expr.step_function_expressions

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

from pyomo.contrib.cp.interval_var import (
    IntervalVar,
    IntervalVarData,
    IntervalVarStartTime,
    IntervalVarEndTime,
)
from pyomo.core.expr.base import ExpressionBase
from pyomo.core.expr.logical_expr import BooleanExpression


def _sum_two_units(_self, _other):
    return CumulativeFunction([_self, _other])


def _sum_cumul_and_unit(_cumul, _unit):
    if _cumul.nargs() == len(_cumul._args_):
        # we can just append to the cumul list
        _cumul._args_.append(_unit)
        return CumulativeFunction(_cumul._args_, nargs=len(_cumul._args_))
    else:
        return CumulativeFunction(_cumul.args + [_unit])


def _sum_unit_and_cumul(_unit, _cumul):
    # Nothing to be done: we need to make a new one because we can't prepend to
    # the list of args.
    return CumulativeFunction([_unit] + _cumul.args)


def _sum_cumuls(_self, _other):
    if _self.nargs() == len(_self._args_):
        _self._args_.extend(_other.args)
        return CumulativeFunction(_self._args_, nargs=len(_self._args_))
    else:
        # we have to clone the list of _args_
        return CumulativeFunction(_self.args + _other.args)


def _subtract_two_units(_self, _other):
    return CumulativeFunction([_self, NegatedStepFunction((_other,))])


def _subtract_cumul_and_unit(_cumul, _unit):
    if _cumul.nargs() == len(_cumul._args_):
        # we can just append to the cumul list
        _cumul._args_.append(NegatedStepFunction((_unit,)))
        return CumulativeFunction(_cumul._args_, nargs=len(_cumul._args_))
    else:
        return CumulativeFunction(_cumul.args + [NegatedStepFunction((_unit,))])


def _subtract_unit_and_cumul(_unit, _cumul):
    # Nothing to be done: we need to make a new one because we can't prepend to
    # the list of args.
    return CumulativeFunction(
        [_unit] + [NegatedStepFunction((a,)) for a in _cumul.args]
    )


def _subtract_cumuls(_self, _other):
    if _self.nargs() == len(_self._args_):
        _self._args_.extend([NegatedStepFunction((a,)) for a in _other.args])
        return CumulativeFunction(_self._args_, nargs=len(_self._args_))
    else:
        # we have to clone the list of _args_
        return CumulativeFunction(
            _self.args + [NegatedStepFunction((a,)) for a in _other.args]
        )


def _generate_sum_expression(_self, _other):
    if isinstance(_self, CumulativeFunction):
        if isinstance(_other, CumulativeFunction):
            return _sum_cumuls(_self, _other)
        elif isinstance(_other, StepFunction):
            return _sum_cumul_and_unit(_self, _other)
    elif isinstance(_self, StepFunction):
        if isinstance(_other, CumulativeFunction):
            return _sum_unit_and_cumul(_self, _other)
        elif isinstance(_other, StepFunction):
            return _sum_two_units(_self, _other)
    raise TypeError(
        "Cannot add object of class %s to object of "
        "class %s" % (_other.__class__, _self.__class__)
    )


def _generate_difference_expression(_self, _other):
    if isinstance(_self, CumulativeFunction):
        if isinstance(_other, CumulativeFunction):
            return _subtract_cumuls(_self, _other)
        elif isinstance(_other, StepFunction):
            return _subtract_cumul_and_unit(_self, _other)
    elif isinstance(_self, StepFunction):
        if isinstance(_other, CumulativeFunction):
            return _subtract_unit_and_cumul(_self, _other)
        elif isinstance(_other, StepFunction):
            return _subtract_two_units(_self, _other)
    raise TypeError(
        "Cannot subtract object of class %s from object of "
        "class %s" % (_other.__class__, _self.__class__)
    )


[docs] class StepFunction(ExpressionBase): """ The base class for the step function expression system. """ __slots__ = () def __add__(self, other): return _generate_sum_expression(self, other) def __radd__(self, other): # Mathematically this doesn't make a whole lot of sense, but we'll call # 0 a function and be happy so that sum() works as expected. if other == 0: return self return _generate_sum_expression(other, self) def __iadd__(self, other): return _generate_sum_expression(self, other) def __sub__(self, other): return _generate_difference_expression(self, other) def __rsub__(self, other): return _generate_difference_expression(other, self) def __isub__(self, other): return _generate_difference_expression(self, other) def within(self, bounds, times): return AlwaysIn(cumul_func=self, bounds=bounds, times=times) @property def args(self): return self._args_[: self.nargs()]
[docs] class Pulse(StepFunction): """ A step function specified by an IntervalVar and an integer height that has value 0 before the IntervalVar's start_time and after the IntervalVar's end time and that takes the value specified by the 'height' during the IntervalVar. (These are often used to model resource constraints.) Args: interval_var (IntervalVar): the interval variable over which the pulse function is non-zero height (int): The value of the pulse function during the time interval_var is scheduled """ __slots__ = '_args_'
[docs] def __init__(self, args=None, interval_var=None, height=None): if args: if any(arg is not None for arg in (interval_var, height)): raise ValueError( "Cannot specify both args and any of {interval_var, height}" ) # Make sure this is a list because we may add to it if this is # summed with other StepFunctions self._args_ = [arg for arg in args] else: self._args_ = [interval_var, height] interval_var = self._args_[0] if ( not isinstance(interval_var, IntervalVarData) or interval_var.ctype is not IntervalVar ): raise TypeError( "The 'interval_var' argument for a 'Pulse' must " "be an 'IntervalVar'.\n" "Received: %s" % type(interval_var) )
@property def _interval_var(self): return self._args_[0] @property def _height(self): return self._args_[1]
[docs] def nargs(self): return 2
def _to_string(self, values, verbose, smap): return "Pulse(%s, height=%s)" % (values[0], values[1])
[docs] class Step(StepFunction): """ A step function specified by a time point and an integer height that has value 0 before the time point and takes the value specified by the 'height' after the time point. Args: time (IntervalVarTimePoint or int): the time point at which the step function becomes non-zero height (int): The value of the step function after the time point """ __slots__ = '_args_' def __new__(cls, time, height): if isinstance(time, int): return StepAt((time, height)) elif time.ctype is IntervalVarStartTime: return StepAtStart((time.get_associated_interval_var(), height)) elif time.ctype is IntervalVarEndTime: return StepAtEnd((time.get_associated_interval_var(), height)) else: raise TypeError( "The 'time' argument for a 'Step' must be either " "an 'IntervalVarTimePoint' (for example, the " "'start_time' or 'end_time' of an IntervalVar) or " "an integer time point in the time horizon.\n" "Received: %s" % type(time) )
[docs] class StepBase(StepFunction): __slots__ = '_args_'
[docs] def __init__(self, args): # Make sure this is a list because we may add to it if this is summed # with otther StepFunctions self._args_ = [arg for arg in args]
@property def _time(self): return self._args_[0] @property def _height(self): return self._args_[1]
[docs] def nargs(self): return 2
def _to_string(self, values, verbose, smap): return "Step(%s, height=%s)" % (values[0], values[1])
[docs] class StepAt(StepBase): __slots__ = ()
[docs] class StepAtStart(StepBase): __slots__ = () def _to_string(self, values, verbose, smap): return "Step(%s, height=%s)" % (self._time.start_time, values[1])
[docs] class StepAtEnd(StepBase): __slots__ = () def _to_string(self, values, verbose, smap): return "Step(%s, height=%s)" % (self._time.end_time, values[1])
[docs] class CumulativeFunction(StepFunction): """ A sum of elementary step functions (Pulse and Step), defining a step function over time. (Often used to model resource constraints.) Args: args (list or tuple): Child elementary step functions of this node """ __slots__ = ('_args_', '_nargs')
[docs] def __init__(self, args, nargs=None): # We make sure args are a list because we might add to them later, if # this is summed with another cumulative function. self._args_ = [arg for arg in args] if nargs is None: self._nargs = len(args) else: self._nargs = nargs
[docs] def nargs(self): return self._nargs
def _to_string(self, values, verbose, smap): s = "" for i, arg in enumerate(self.args): if isinstance(arg, NegatedStepFunction): s += str(arg) + ' ' else: s += "+ %s "[2 * (i == 0) :] % str(arg) return s[:-1]
[docs] class NegatedStepFunction(StepFunction): """ The negated form of an elementary step function: That is, it represents subtracting the elementary function's (nonnegative) height rather than adding it. Args: arg (Step or Pulse): Child elementary step function of this node """ __slots__ = '_args_'
[docs] def __init__(self, args): self._args_ = args
[docs] def nargs(self): return 1
def _to_string(self, values, verbose, smap): return "- %s" % values[0]
[docs] class AlwaysIn(BooleanExpression): """ An expression representing the constraint that a cumulative function is required to take values within a tuple of bounds over a specified time interval. (Often used to enforce limits on resource availability.) Args: cumul_func (CumulativeFunction): Step function being constrained bounds (tuple of two integers): Lower and upper bounds to enforce on the cumulative function times (tuple of two integers): The time interval (start, end) over which to enforce the bounds on the values of the cumulative function. """ __slots__ = ()
[docs] def __init__(self, args=None, cumul_func=None, bounds=None, times=None): if args: if any(arg is not None for arg in {cumul_func, bounds, times}): raise ValueError( "Cannot specify both args and any of {cumul_func, bounds, times}" ) self._args_ = args else: self._args_ = (cumul_func, bounds[0], bounds[1], times[0], times[1])
[docs] def nargs(self): return 5
def _to_string(self, values, verbose, smap): return "(%s).within(bounds=(%s, %s), times=(%s, %s))" % ( values[0], values[1], values[2], values[3], values[4], )