Source code for pyomo.network.port

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import logging, sys
from weakref import ref as weakref_ref

from pyomo.common.autoslots import AutoSlots
from pyomo.common.collections import ComponentMap
from pyomo.common.deprecation import RenamedClass
from pyomo.common.formatting import tabular_writer
from pyomo.common.log import is_debug_set
from pyomo.common.modeling import unique_component_name, NOTSET
from pyomo.common.numeric_types import value
from pyomo.common.timing import ConstructionTimer

from pyomo.core.base.var import Var
from pyomo.core.base.constraint import Constraint
from pyomo.core.base.component import ComponentData, ModelComponentFactory
from pyomo.core.base.global_set import UnindexedComponent_index
from pyomo.core.base.indexed_component import IndexedComponent, UnindexedComponent_set
from pyomo.core.base.misc import apply_indexed_rule
from pyomo.core.expr.numvalue import as_numeric
from pyomo.core.expr import identify_variables
from pyomo.core.base.label import alphanum_label_from_name

from pyomo.network.util import create_var, tighten_var_domain

logger = logging.getLogger('pyomo.network')


[docs] class PortData(ComponentData): """ This class defines the data for a single Port Attributes ---------- vars:`dict` A dictionary mapping added names to variables """ __slots__ = ('vars', '_arcs', '_sources', '_dests', '_rules', '_splitfracs') __autoslot_mappers__ = { '_arcs': AutoSlots.weakref_sequence_mapper, '_sources': AutoSlots.weakref_sequence_mapper, '_dests': AutoSlots.weakref_sequence_mapper, }
[docs] def __init__(self, component=None): # # These lines represent in-lining of the # following constructors: # - ComponentData # - NumericValue self._component = weakref_ref(component) if (component is not None) else None self._index = NOTSET self.vars = {} self._arcs = [] self._sources = [] self._dests = [] self._rules = {} self._splitfracs = ComponentMap()
def __getattr__(self, name): """Returns `self.vars[name]` if it exists""" if name in self.vars: return self.vars[name] # Since the base classes don't support getattr, we can just # throw the "normal" AttributeError raise AttributeError( "'%s' object has no attribute '%s'" % (self.__class__.__name__, name) )
[docs] def arcs(self, active=None): """A list of Arcs in which this Port is a member""" return self._collect_ports(active, self._arcs)
[docs] def sources(self, active=None): """A list of Arcs in which this Port is a destination""" return self._collect_ports(active, self._sources)
[docs] def dests(self, active=None): """A list of Arcs in which this Port is a source""" return self._collect_ports(active, self._dests)
def _collect_ports(self, active, port_list): # need to call the weakrefs if active is None: return [_a() for _a in port_list] tmp = [] for _a in port_list: a = _a() if a.active == active: tmp.append(a) return tmp
[docs] def set_value(self, value): """Cannot specify the value of a port""" raise ValueError("Cannot specify the value of a port: '%s'" % self.name)
[docs] def polynomial_degree(self): """Returns the maximum polynomial degree of all port members""" ans = 0 for v in self.iter_vars(): tmp = v.polynomial_degree() if tmp is None: return None ans = max(ans, tmp) return ans
[docs] def is_fixed(self): """Return True if all vars/expressions in the Port are fixed""" return all(v.is_fixed() for v in self.iter_vars())
[docs] def is_potentially_variable(self): """Return True as ports may (should!) contain variables""" return True
[docs] def is_binary(self): """Return True if all variables in the Port are binary""" return len(self) and all(v.is_binary() for v in self.iter_vars(expr_vars=True))
[docs] def is_integer(self): """Return True if all variables in the Port are integer""" return len(self) and all(v.is_integer() for v in self.iter_vars(expr_vars=True))
[docs] def is_continuous(self): """Return True if all variables in the Port are continuous""" return len(self) and all( v.is_continuous() for v in self.iter_vars(expr_vars=True) )
[docs] def add(self, var, name=None, rule=None, **kwds): """ Add `var` to this Port, casting it to a Pyomo numeric if necessary Arguments --------- var A variable or some `NumericValue` like an expression name: `str` Name to associate with this member of the Port rule: `function` Function implementing the desired expansion procedure for this member. `Port.Equality` by default, other options include `Port.Extensive`. Customs are allowed. kwds Keyword arguments that will be passed to rule """ if var is not None: try: # indexed components are ok, but as_numeric will error on them # make sure they have this attribute var.is_indexed() except AttributeError: var = as_numeric(var) if name is None: name = var.local_name if name in self.vars and self.vars[name] is not None: # don't throw warning if replacing an implicit (None) var logger.warning( "Implicitly replacing variable '%s' in Port '%s'.\n" "To avoid this warning, use Port.remove() first." % (name, self.name) ) self.vars[name] = var if rule is None: rule = Port.Equality if rule is Port.Extensive: # avoid name collisions if ( name.endswith("_split") or name.endswith("_equality") or name == "splitfrac" ): raise ValueError( "Extensive variable '%s' on Port '%s' may not end " "with '_split' or '_equality'" % (name, self.name) ) self._rules[name] = (rule, kwds)
[docs] def remove(self, name): """Remove this member from the port""" if name not in self.vars: raise ValueError( "Cannot remove member '%s' not in Port '%s'" % (name, self.name) ) self.vars.pop(name) self._rules.pop(name)
[docs] def rule_for(self, name): """Return the rule associated with the given port member""" return self._rules[name][0]
[docs] def is_equality(self, name): """Return True if the rule for this port member is Port.Equality""" return self.rule_for(name) is Port.Equality
[docs] def is_extensive(self, name): """Return True if the rule for this port member is Port.Extensive""" return self.rule_for(name) is Port.Extensive
[docs] def fix(self): """ Fix all variables in the port at their current values. For expressions, fix every variable in the expression. """ for v in self.iter_vars(expr_vars=True, fixed=False): v.fix()
[docs] def unfix(self): """ Unfix all variables in the port. For expressions, unfix every variable in the expression. """ for v in self.iter_vars(expr_vars=True, fixed=True): v.unfix()
free = unfix
[docs] def iter_vars(self, expr_vars=False, fixed=None, names=False): """ Iterate through every member of the port, going through the indices of indexed members. Arguments --------- expr_vars: `bool` If True, call `identify_variables` on expression type members fixed: `bool` Only include variables/expressions with this type of fixed names: `bool` If True, yield (name, index, var/expr) tuples """ for name, mem in self.vars.items(): if not mem.is_indexed(): itr = {None: mem} else: itr = mem for idx, v in itr.items(): if fixed is not None and v.is_fixed() != fixed: continue if expr_vars and v.is_expression_type(): for var in identify_variables(v): if fixed is not None and var.is_fixed() != fixed: continue if names: yield name, idx, var else: yield var else: if names: yield name, idx, v else: yield v
[docs] def set_split_fraction(self, arc, val, fix=True): """ Set the split fraction value to be used for an arc during arc expansion when using `Port.Extensive`. """ if arc not in self.dests(): raise ValueError( "Port '%s' is not a source of Arc '%s', cannot " "set split fraction" % (self.name, arc.name) ) self._splitfracs[arc] = (val, fix)
[docs] def get_split_fraction(self, arc): """ Returns a tuple (val, fix) for the split fraction of this arc that was set via `set_split_fraction` if it exists, and otherwise None. """ res = self._splitfracs.get(arc, None) if res is None: return None else: return res
[docs] class _PortData(metaclass=RenamedClass): __renamed__new_class__ = PortData __renamed__version__ = '6.7.2'
[docs] @ModelComponentFactory.register( "A bundle of variables that can be connected to other ports." ) class Port(IndexedComponent): """ A collection of variables, which may be connected to other ports The idea behind Ports is to create a bundle of variables that can be manipulated together by connecting them to other ports via Arcs. A preprocess transformation will look for Arcs and expand them into a series of constraints that involve the original variables contained within the Port. The way these constraints are built can be specified for each Port member when adding members to the port, but by default the Port members will be equated to each other. Additionally, other objects such as expressions can be added to Ports as long as they, or their indexed members, can be manipulated within constraint expressions. Parameters ---------- rule: `function` A function that returns a dict of (name: var) pairs to be initially added to the Port. Instead of var it could also be a tuples of (var, rule). Or it could return an iterable of either vars or tuples of (var, rule) for implied names. initialize Follows same specifications as rule's return value, gets initially added to the Port implicit An iterable of names to be initially added to the Port as implicit vars extends: `Port` A Port whose vars will be added to this Port upon construction """ def __new__(cls, *args, **kwds): if cls != Port: return super(Port, cls).__new__(cls) if not args or (args[0] is UnindexedComponent_set and len(args) == 1): return ScalarPort.__new__(ScalarPort) else: return IndexedPort.__new__(IndexedPort)
[docs] def __init__(self, *args, **kwd): self._rule = kwd.pop('rule', None) self._initialize = kwd.pop('initialize', {}) self._implicit = kwd.pop('implicit', {}) self._extends = kwd.pop('extends', None) kwd.setdefault('ctype', Port) IndexedComponent.__init__(self, *args, **kwd)
# This method must be defined on subclasses of # IndexedComponent that support implicit definition def _getitem_when_not_present(self, idx): """Returns the default component data value.""" tmp = self._data[idx] = PortData(component=self) tmp._index = idx return tmp
[docs] def construct(self, data=None): if self._constructed: return self._constructed = True timer = ConstructionTimer(self) if is_debug_set(logger): # pragma:nocover logger.debug("Constructing Port, name=%s, from data=%s" % (self.name, data)) if self._anonymous_sets is not None: for _set in self._anonymous_sets: _set.construct() # Construct PortData objects for all index values if self.is_indexed(): self._initialize_members(self._index_set) else: self._data[None] = self self._initialize_members([None]) # get rid of these references self._rule = None self._initialize = None self._implicit = None self._extends = None # especially important as this is another port timer.report()
def _initialize_members(self, initSet): for idx in initSet: tmp = self[idx] for key in self._implicit: tmp.add(None, key) if self._extends: for key, val in self._extends.vars.items(): tmp.add(val, key, self._extends.rule_for(key)) if self._initialize: self._add_from_container(tmp, self._initialize) if self._rule: items = apply_indexed_rule(self, self._rule, self._parent(), idx) self._add_from_container(tmp, items) def _add_from_container(self, port, items): if type(items) is dict: for key, val in items.items(): if type(val) is tuple: if len(val) == 2: obj, rule = val port.add(obj, key, rule) else: obj, rule, kwds = val port.add(obj, key, rule, **kwds) else: port.add(val, key) else: for val in self._initialize: if type(val) is tuple: if len(val) == 2: obj, rule = val port.add(obj, rule=rule) else: obj, rule, kwds = val port.add(obj, rule=rule, **kwds) else: port.add(val) def _pprint(self, ostream=None, verbose=False): """Print component information.""" def _line_generator(k, v): for _k, _v in sorted(v.vars.items()): if _v is None: _len = '-' elif _v.is_indexed(): _len = len(_v) else: _len = 1 yield _k, _len, str(_v) return ( [ ("Size", len(self)), ("Index", self._index_set if self.is_indexed() else None), ], self._data.items(), ("Name", "Size", "Variable"), _line_generator, )
[docs] def display(self, prefix="", ostream=None): """ Print component state information This duplicates logic in Component.pprint() """ if not self.active: return if ostream is None: ostream = sys.stdout tab = " " ostream.write(prefix + self.local_name + " : ") ostream.write("Size=" + str(len(self))) ostream.write("\n") def _line_generator(k, v): for _k, _v in sorted(v.vars.items()): if _v is None: _val = '-' elif not _v.is_indexed(): _val = str(value(_v)) else: _val = "{%s}" % ( ', '.join( '%r: %r' % (x, value(_v[x])) for x in sorted(_v._data) ) ) yield _k, _val tabular_writer( ostream, prefix + tab, ((k, v) for k, v in self._data.items()), ("Name", "Value"), _line_generator, )
[docs] @staticmethod def Equality(port, name, index_set): """Arc Expansion procedure to generate simple equality constraints""" # Iterate over every arc off this port. Since this function will # be called for every port, we need to check if it already exists. for arc in port.arcs(active=True): Port._add_equality_constraint(arc, name, index_set)
[docs] @staticmethod def Extensive(port, name, index_set, include_splitfrac=None, write_var_sum=True): """Arc Expansion procedure for extensive variable properties This procedure is the rule to use when variable quantities should be conserved; that is, split for outlets and combined for inlets. This will first go through every destination of the port (i.e., arcs whose source is this Port) and create a new variable on the arc's expanded block of the same index as the current variable being processed to store the amount of the variable that flows over the arc. For ports that have multiple outgoing arcs, this procedure will create a single splitfrac variable on the arc's expanded block as well. Then it will generate constraints for the new variable that relate it to the port member variable using the split fraction, ensuring that all extensive variables in the Port are split using the same ratio. The generation of the split fraction variable and constraint can be suppressed by setting the `include_splitfrac` argument to `False`. Once all arc-specific variables are created, this procedure will create the "balancing constraint" that ensures that the sum of all the new variables equals the original port member variable. This constraint can be suppressed by setting the `write_var_sum` argument to `False`; in which case, a single constraint will be written that states the sum of the split fractions equals 1. Finally, this procedure will go through every source for this port and create a new arc variable (unless it already exists), before generating the balancing constraint that ensures the sum of all the incoming new arc variables equals the original port variable. Model simplifications: If the port has a 1-to-1 connection on either side, it will not create the new variables and instead write a simple equality constraint for that side. If the outlet side is not 1-to-1 but there is only one outlet, it will not create a splitfrac variable or write the split constraint, but it will still write the outsum constraint which will be a simple equality. If the port only contains a single Extensive variable, the splitfrac variables and the splitting constraints will be skipped since they will be unnecessary. However, they can be still be included by passing `include_splitfrac=True`. .. note:: If split fractions are skipped, the `write_var_sum=False` option is not allowed. """ port_parent = port.parent_block() out_vars = Port._Split( port, name, index_set, include_splitfrac=include_splitfrac, write_var_sum=write_var_sum, ) in_vars = Port._Combine(port, name, index_set)
@staticmethod def _Combine(port, name, index_set): port_parent = port.parent_block() var = port.vars[name] in_vars = [] sources = port.sources(active=True) if not len(sources): return in_vars if len(sources) == 1 and len(sources[0].source.dests(active=True)) == 1: # This is a 1-to-1 connection, no need for evar, just equality. arc = sources[0] Port._add_equality_constraint(arc, name, index_set) return in_vars for arc in sources: eblock = arc.expanded_block # Make and record new variables for every arc with this member. evar = Port._create_evar(port.vars[name], name, eblock, index_set) in_vars.append(evar) if len(sources) == 1: tighten_var_domain(port.vars[name], in_vars[0], index_set) # Create constraint: var == sum of evars # Same logic as Port._Split cname = unique_component_name( port_parent, "%s_%s_insum" % (alphanum_label_from_name(port.local_name), name), ) if index_set is not UnindexedComponent_set: def rule(m, *args): return sum(evar[args] for evar in in_vars) == var[args] else: def rule(m): return sum(evar for evar in in_vars) == var con = Constraint(index_set, rule=rule) port_parent.add_component(cname, con) return in_vars @staticmethod def _Split(port, name, index_set, include_splitfrac=None, write_var_sum=True): port_parent = port.parent_block() var = port.vars[name] out_vars = [] dests = port.dests(active=True) if not len(dests): return out_vars if len(dests) == 1: # No need for splitting on one outlet. # Make sure they do not try to fix splitfrac not at 1. splitfracspec = port.get_split_fraction(dests[0]) if splitfracspec is not None: if splitfracspec[0] != 1 and splitfracspec[1] == True: raise ValueError( "Cannot fix splitfrac not at 1 for port '%s' with a " "single dest '%s'" % (port.name, dests[0].name) ) if include_splitfrac is not True: include_splitfrac = False if len(dests[0].destination.sources(active=True)) == 1: # This is a 1-to-1 connection, no need for evar, just equality. arc = dests[0] Port._add_equality_constraint(arc, name, index_set) return out_vars for arc in dests: eblock = arc.expanded_block # Make and record new variables for every arc with this member. evar = Port._create_evar(port.vars[name], name, eblock, index_set) out_vars.append(evar) if include_splitfrac is False: continue # Create and potentially initialize split fraction variables. # This function will be called for every Extensive member of this # port, but we only need one splitfrac variable per arc, so check # if it already exists before making a new one. However, we do not # need a splitfrac if there is only one Extensive data object, # so first check whether or not we need it. if eblock.component("splitfrac") is None: if not include_splitfrac: num_data_objs = 0 for k, v in port.vars.items(): if port.is_extensive(k): if v.is_indexed(): num_data_objs += len(v) else: num_data_objs += 1 if num_data_objs > 1: break if num_data_objs <= 1: # Do not make splitfrac, do not make split constraints. # Make sure they didn't specify splitfracs. # This inner loop will only run once. for arc in dests: if port.get_split_fraction(arc) is not None: raise ValueError( "Cannot specify splitfracs for port '%s' " "(found arc '%s') because this port only " "has one variable. To have control over " "splitfracs, please pass the " " include_splitfrac=True argument." % (port.name, arc.name) ) include_splitfrac = False continue eblock.splitfrac = Var() splitfracspec = port.get_split_fraction(arc) if splitfracspec is not None: eblock.splitfrac = splitfracspec[0] if splitfracspec[1]: eblock.splitfrac.fix() # Create constraint for this member using splitfrac. cname = "%s_split" % name if index_set is not UnindexedComponent_set: def rule(m, *args): return evar[args] == eblock.splitfrac * var[args] else: def rule(m): return evar == eblock.splitfrac * var con = Constraint(index_set, rule=rule) eblock.add_component(cname, con) if len(dests) == 1: tighten_var_domain(port.vars[name], out_vars[0], index_set) if write_var_sum: # Create var total sum constraint: var == sum of evars # Need to alphanum port name in case it is indexed. cname = unique_component_name( port_parent, "%s_%s_outsum" % (alphanum_label_from_name(port.local_name), name), ) if index_set is not UnindexedComponent_set: def rule(m, *args): return sum(evar[args] for evar in out_vars) == var[args] else: def rule(m): return sum(evar for evar in out_vars) == var con = Constraint(index_set, rule=rule) port_parent.add_component(cname, con) else: # OR create constraint on splitfrac vars: sum == 1 if include_splitfrac is False: raise ValueError( "Cannot choose to write split fraction sum constraint for " "ports with a single destination or a single Extensive " "variable.\nSplit fractions are skipped in this case to " "simplify the model.\nPlease use write_var_sum=True on " "this port (the default)." ) cname = unique_component_name( port_parent, "%s_frac_sum" % alphanum_label_from_name(port.local_name) ) con = Constraint(expr=sum(a.expanded_block.splitfrac for a in dests) == 1) port_parent.add_component(cname, con) return out_vars @staticmethod def _add_equality_constraint(arc, name, index_set): # This function will add the equality constraint if it doesn't exist. eblock = arc.expanded_block cname = name + "_equality" if eblock.component(cname) is not None: # already exists, skip return port1, port2 = arc.ports if index_set is not UnindexedComponent_set: def rule(m, *args): return port1.vars[name][args] == port2.vars[name][args] else: def rule(m): return port1.vars[name] == port2.vars[name] con = Constraint(index_set, rule=rule) eblock.add_component(cname, con) @staticmethod def _create_evar(member, name, eblock, index_set): # Name is same, conflicts are prevented by a check in Port.add. # The new var will mirror the original var and have same index set. # We only need one evar per arc, so check if it already exists # before making a new one. evar = eblock.component(name) if evar is None: evar = create_var(member, name, eblock, index_set) return evar
[docs] class ScalarPort(Port, PortData):
[docs] def __init__(self, *args, **kwd): PortData.__init__(self, component=self) Port.__init__(self, *args, **kwd) self._index = UnindexedComponent_index
[docs] class SimplePort(metaclass=RenamedClass): __renamed__new_class__ = ScalarPort __renamed__version__ = '6.0'
[docs] class IndexedPort(Port): pass