Source code for pyomo.core.kernel.block

#  ___________________________________________________________________________
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import logging
import math

from pyomo.core.staleflag import StaleFlagManager
from pyomo.core.expr.symbol_map import SymbolMap
from pyomo.core.kernel.base import _no_ctype, _convert_ctype
from pyomo.core.kernel.heterogeneous_container import IHeterogeneousContainer
from pyomo.core.kernel.container_utils import define_simple_containers

logger = logging.getLogger('pyomo.core')

[docs] class IBlock(IHeterogeneousContainer): """A generalized container that can store objects of any category type as attributes. """ __slots__ = () _child_storage_delimiter_string = "." _child_storage_entry_string = "%s" # # Define the IHeterogeneousContainer abstract methods # # def child_ctypes(self, *args, **kwds): # ... not defined here # # Define the ICategorizedObjectContainer abstract methods #
[docs] def child(self, key): """Get the child object associated with a given storage key for this container. Raises: KeyError: if the argument is not a storage key for any children of this container """ try: return getattr(self, key) except AttributeError: raise KeyError(str(key))
# def children(self, *args, **kwds): # ... not defined here
[docs] class block(IBlock): """A generalized container for defining hierarchical models by adding modeling components as attributes. Examples: >>> import pyomo.kernel as pmo >>> model = pmo.block() >>> model.x = pmo.variable() >>> model.c = pmo.constraint(model.x >= 1) >>> model.o = pmo.objective(model.x) """ _ctype = IBlock # determines when large block storage is activated _lbs_count = 4 _block_reserved_words = set() @staticmethod def _refresh_block_reserved_words(): block._block_reserved_words = set(dir(block())) # this is protected because it is # defined as a property with a setter # that explicitly disallows this block._block_reserved_words.remove('active')
[docs] def __init__(self): # bypass __setatrr__ for this class d = self.__dict__ d['_parent'] = None d['_storage_key'] = None d['_active'] = True d['_block__byctype'] = None d['_block__order'] = dict()
def _activate_large_storage_mode(self): if self.__byctype.__class__ is not dict: self_byctype = self.__dict__['_block__byctype'] = dict() for key, obj in self.__order.items(): ctype = obj.ctype if ctype not in self_byctype: self_byctype[ctype] = dict() self_byctype[ctype][key] = obj # # Define the IHeterogeneousContainer abstract methods #
[docs] def child_ctypes(self): """Returns the set of child object category types stored in this container.""" self_byctype = self.__byctype if self_byctype is None: # empty return () elif self_byctype.__class__ is int: # small block storage # (self_byctype is a union of hash bytes) ctypes_set = set() ctypes = [] for child in self.__order.values(): child_ctype = child.ctype if child_ctype not in ctypes_set: ctypes_set.add(child_ctype) ctypes.append(child_ctype) return tuple(ctypes) elif self_byctype.__class__ is dict: # large-block storage return tuple(self_byctype) else: # storing a single ctype return (self_byctype,)
# # Define the ICategorizedObjectContainer abstract methods #
[docs] def children(self, ctype=_no_ctype): """Iterate over the children of this block. Args: ctype: Indicates the category of children to include. The default value indicates that all categories should be included. Returns: iterator of child objects """ self_byctype = self.__byctype if self_byctype is None: # empty return # convert AML types into Kernel types (hack for the # solver interfaces) ctype = _convert_ctype.get(ctype, ctype) if ctype is _no_ctype: yield from self.__order.values() elif self_byctype.__class__ is dict: # large-block storage if ctype in self_byctype: yield from self_byctype[ctype].values() elif self_byctype.__class__ is int: # small-block storage # (self_byctype is a union of hash bytes) h_ = hash(ctype) if (self_byctype & h_) == h_: for child in self.__order.values(): if child.ctype is ctype: yield child elif self_byctype is ctype: # storing a single ctype yield from self.__order.values()
# # Interface # def __setattr__(self, name, obj): if name in self._block_reserved_words: raise ValueError( "Attempting to modify a reserved block attribute: %s" % (name,) ) needs_del = False same_obj = False self_order = self.__order if name in self_order: # to avoid an edge case, we need to delay # deleting the current object until we # check the parent of the new object needs_del = True if self_order[name] is not obj: logger.warning( "Implicitly replacing attribute %s (type=%s) " "on block with new object (type=%s). This " "is usually indicative of a modeling error. " "To avoid this warning, delete the original " "object from the block before assigning a new " "object." % ( name, getattr(self, name).__class__.__name__, obj.__class__.__name__, ) ) else: same_obj = True assert obj.parent is self try: ctype = obj.ctype except AttributeError: if needs_del: delattr(self, name) else: if (obj.parent is None) or same_obj: if needs_del: delattr(self, name) obj._update_parent_and_storage_key(self, name) self_order[name] = obj self_byctype = self.__byctype if self_byctype is None: # storing a single ctype self.__dict__['_block__byctype'] = ctype elif self_byctype.__class__ is dict: # large-block storage if ctype not in self_byctype: self_byctype[ctype] = dict() self_byctype[ctype][name] = obj elif self_byctype.__class__ is int: # small-block storage # (_byctype is a union of hash bytes) if len(self_order) > self._lbs_count: # activate the large storage format # if we have exceeded the threshold self._activate_large_storage_mode() else: self.__dict__['_block__byctype'] |= hash(ctype) else: # currently storing a single ctype if ctype is not self_byctype: if len(self_order) > self._lbs_count: # activate the large storage format # if we have exceeded the threshold self._activate_large_storage_mode() else: self.__dict__['_block__byctype'] = hash( self_byctype ) | hash(ctype) else: raise ValueError( "Invalid assignment to %s type with name '%s' " "at entry %s. A parent container has already " "been assigned to the object being inserted: %s" % (self.__class__.__name__,, name, ) super(block, self).__setattr__(name, obj) def __delattr__(self, name): self_order = self.__order if name in self_order: obj = self_order[name] del self_order[name] obj._clear_parent_and_storage_key() self_byctype = self.__byctype if self_byctype.__class__ is dict: # large-block storage ctype = obj.ctype del self_byctype[ctype][name] if len(self_byctype[ctype]) == 0: del self_byctype[ctype] super(block, self).__delattr__(name)
[docs] def write( self, filename, format=None, _solver_capability=None, _called_by_solver=False, **kwds ): """ Write the model to a file, with a given format. Args: filename (str): The name of the file to write. format: The file format to use. If this is not specified, the file format will be inferred from the filename suffix. **kwds: Additional keyword options passed to the model writer. Returns: a :class:`SymbolMap` """ import pyomo.opt # # Guess the format if none is specified # if format is None: format = pyomo.opt.base.guess_format(filename) problem_writer = pyomo.opt.WriterFactory(format) if problem_writer is None: raise ValueError( "Cannot write model in format '%s': no model " "writer registered for that format" % str(format) ) if _solver_capability is None: _solver_capability = lambda x: True (filename_, smap) = problem_writer(self, filename, _solver_capability, kwds) assert filename_ == filename if _called_by_solver: # BIG HACK smap_id = id(smap) if not hasattr(self, "._symbol_maps"): setattr(self, "._symbol_maps", {}) getattr(self, "._symbol_maps")[smap_id] = smap return smap_id else: return smap
[docs] def load_solution( self, solution, allow_consistent_values_for_fixed_vars=False, comparison_tolerance_for_fixed_vars=1e-5, ): """ Load a solution. Args: solution: A :class:`pyomo.opt.Solution` object with a symbol map. Optionally, the solution can be tagged with a default variable value (e.g., 0) that will be applied to those variables in the symbol map that do not have a value in the solution. allow_consistent_values_for_fixed_vars: Indicates whether a solution can specify consistent values for variables that are fixed. comparison_tolerance_for_fixed_vars: The tolerance used to define whether or not a value in the solution is consistent with the value of a fixed variable. """ from pyomo.core.kernel.suffix import import_suffix_generator symbol_map = solution.symbol_map default_variable_value = getattr(solution, "default_variable_value", None) # Generate the list of active import suffixes on # this top level model valid_import_suffixes = { obj.storage_key: obj for obj in import_suffix_generator(self) } # To ensure that import suffix data gets properly # overwritten (e.g., the case where nonzero dual # values exist on the suffix and but only sparse # dual values exist in the results object) we clear # all active import suffixes. for suffix in valid_import_suffixes.values(): suffix.clear() # Load problem (model) level suffixes. These would # only come from ampl interfaced solution suffixes # at this point in time. for _attr_key, attr_value in solution.problem.items(): attr_key = _attr_key[0].lower() + _attr_key[1:] if attr_key in valid_import_suffixes: valid_import_suffixes[attr_key][self] = attr_value # # Set the "stale" flag of each variable in the model prior to # loading the solution, so you known which variables have "real" # values and which ones don't. # StaleFlagManager.mark_all_as_stale() # # Load variable data # from pyomo.core.kernel.variable import IVariable var_skip_attrs = ['id', 'canonical_label'] seen_var_ids = set() for label, entry in solution.variable.items(): var = symbol_map.getObject(label) if (var is None) or (var is SymbolMap.UnknownSymbol): # NOTE: the following is a hack, to handle # the ONE_VAR_CONSTANT variable that is # necessary for the objective # constant-offset terms. probably should # create a dummy variable in the model # map at the same time the objective # expression is being constructed. if "ONE_VAR_CONST" in label: continue else: raise KeyError( "Variable associated with symbol '%s' " "is not found on this block" % (label) ) seen_var_ids.add(id(var)) if (not allow_consistent_values_for_fixed_vars) and var.fixed: raise ValueError( "Variable '%s' is currently fixed. " "A new value is not expected " "in solution" % ( ) for _attr_key, attr_value in entry.items(): attr_key = _attr_key[0].lower() + _attr_key[1:] if attr_key == 'value': if ( allow_consistent_values_for_fixed_vars and var.fixed and ( math.fabs(attr_value - var.value) > comparison_tolerance_for_fixed_vars ) ): raise ValueError( "Variable %s is currently fixed. " "A value of '%s' in solution is " "not within tolerance=%s of the current " "value of '%s'" % (, attr_value, comparison_tolerance_for_fixed_vars, var.value, ) ) var.set_value(attr_value, skip_validation=True) elif attr_key in valid_import_suffixes: valid_import_suffixes[attr_key][var] = attr_value # start to build up the set of unseen variable ids unseen_var_ids = set(symbol_map.byObject.keys()) # at this point it contains ids for non-variable types unseen_var_ids.difference_update(seen_var_ids) # # Load objective solution (should simply be suffixes if # they exist) # objective_skip_attrs = ['id', 'canonical_label', 'value'] for label, entry in solution.objective.items(): obj = symbol_map.getObject(label) if (obj is None) or (obj is SymbolMap.UnknownSymbol): raise KeyError( "Objective associated with symbol '%s' " "is not found on this block" % (label) ) # Because of __default_objective__, an objective might # appear twice in the objective dictionary. unseen_var_ids.discard(id(obj)) for _attr_key, attr_value in entry.items(): attr_key = _attr_key[0].lower() + _attr_key[1:] if attr_key in valid_import_suffixes: valid_import_suffixes[attr_key][obj] = attr_value # # Load constraint solution # con_skip_attrs = ['id', 'canonical_label'] for label, entry in solution.constraint.items(): con = symbol_map.getObject(label) if con is SymbolMap.UnknownSymbol: # # This is a hack - see above. # if "ONE_VAR_CONST" in label: continue else: raise KeyError( "Constraint associated with symbol '%s' " "is not found on this block" % (label) ) unseen_var_ids.discard(id(con)) for _attr_key, attr_value in entry.items(): attr_key = _attr_key[0].lower() + _attr_key[1:] if attr_key in valid_import_suffixes: valid_import_suffixes[attr_key][con] = attr_value # # Load sparse variable solution # if default_variable_value is not None: for var_id in unseen_var_ids: var = symbol_map.getObject(symbol_map.byObject[var_id]) if var.ctype is not IVariable: continue if (not allow_consistent_values_for_fixed_vars) and var.fixed: raise ValueError( "Variable '%s' is currently fixed. " "A new value is not expected " "in solution" % ( ) if ( allow_consistent_values_for_fixed_vars and var.fixed and ( math.fabs(default_variable_value - var.value) > comparison_tolerance_for_fixed_vars ) ): raise ValueError( "Variable %s is currently fixed. " "A value of '%s' in solution is " "not within tolerance=%s of the current " "value of '%s'" % (, default_variable_value, comparison_tolerance_for_fixed_vars, var.value, ) ) var.set_value(default_variable_value, skip_validation=True) # Set the state flag to "delayed advance": it will auto-advance # if a non-stale variable is updated (causing all non-stale # variables to be marked as stale). StaleFlagManager.mark_all_as_stale(delayed=True)
# inserts class definitions for simple _tuple, _list, and # _dict containers into this module define_simple_containers(globals(), "block", IBlock) # populate the initial set of reserved block attributes so # that users can not overwrite them when building a model block._refresh_block_reserved_words()