Source code for pyomo.dataportal.DataPortal

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

import logging
from pyomo.common.log import is_debug_set
from pyomo.dataportal.factory import DataManagerFactory, UnknownDataManager

logger = logging.getLogger('pyomo.core')


[docs]class DataPortal(object): """ An object that manages loading and storing data from external data sources. This object interfaces to plugins that manipulate the data in a manner that is dependent on the data format. Internally, the data in a DataPortal object is organized as follows:: data[namespace][symbol][index] -> value All data is associated with a symbol name, which may be indexed, and which may belong to a namespace. The default namespace is :const:`None`. Args: model: The model for which this data is associated. This is used for error checking (e.g. object names must exist in the model, set dimensions must match, etc.). Default is :const:`None`. filename (str): A file from which data is loaded. Default is :const:`None`. data_dict (dict): A dictionary used to initialize the data in this object. Default is :const:`None`. """
[docs] def __init__(self, *args, **kwds): """ Constructor """ if len(args) > 0: raise RuntimeError( "Unexpected constructor argument for a DataPortal object" ) # Initialize this object with no data manager self._data_manager = None # Map initialization data as follows: _data[namespace][symbol] -> data self._data = {} # This is the data that is imported from various sources self._default = {} # Get the model for which this data is associated. self._model = kwds.pop('model', None) # Load data from a file ... if 'filename' in kwds: filename = kwds.pop('filename') self.connect(filename=filename, **kwds) self.load() self.disconnect() # Or load data from a dictionary elif 'data_dict' in kwds: data = kwds.pop('data_dict') if not data is None: self._data = data elif len(kwds) > 0: raise ValueError("Unknown options: %s" % str(kwds.keys()))
[docs] def connect(self, **kwds): """ Construct a data manager object that is associated with the input source. This data manager is used to process future data imports and exports. Args: filename (str): A filename that specifies the data source. Default is :const:`None`. server (str): The name of the remote server that hosts the data. Default is :const:`None`. using (str): The name of the resource used to load the data. Default is :const:`None`. Other keyword arguments are passed to the data manager object. """ if not self._data_manager is None: self._data_manager.close() data = kwds.get('using', None) if data is None: data = kwds.get('filename', None) if data is None: data = kwds.get('server', None) if '.' in data: tmp = data.split(".")[-1] else: tmp = data self._data_manager = DataManagerFactory(tmp) if type(self._data_manager) is UnknownDataManager: raise IOError("Unknown file format '%s'" % tmp) self._data_manager.initialize(**kwds) self._data_manager.open()
[docs] def disconnect(self): """ Close the data manager object that is associated with the input source. """ self._data_manager.close() self._data_manager = None
[docs] def load(self, **kwds): """ Import data from an external data source. Args: model: The model object for which this data is associated. Default is :const:`None`. Other keyword arguments are passed to the :func:`connect()` method. """ if is_debug_set(logger): # pragma:nocover logger.debug("Loading data...") # # Process arguments # _model = kwds.pop('model', None) if not _model is None: self._model = _model # # If _disconnect is True, then disconnect the data # manager after we load data # _disconnect = False if self._data_manager is None: # # Start a new connection # self.connect(**kwds) _disconnect = True elif len(kwds) > 0: # # We are continuing to store using an existing connection. # # Q: Should we reinitialize? The semantic difference between # initialize() and add_options() aren't clear. # self._data_manager.add_options(**kwds) # # Preprocess the command-line options # self._preprocess_options() # # Read from data manager into self._data and self._default # if is_debug_set(logger): # pragma:nocover logger.debug("Processing data ...") self._data_manager.read() status = self._data_manager.process(self._model, self._data, self._default) self._data_manager.clear() # # Disconnect # if _disconnect: self.disconnect() if is_debug_set(logger): # pragma:nocover logger.debug("Done.")
[docs] def store(self, **kwds): """ Export data to an external data source. Args: model: The model object for which this data is associated. Default is :const:`None`. Other keyword arguments are passed to the :func:`connect()` method. """ if is_debug_set(logger): # pragma:nocover logger.debug("Storing data...") # # Process arguments # _model = kwds.pop('model', None) if not _model is None: self._model = _model # # If _disconnect is True, then disconnect the data manager # after we load data # _disconnect = False if self._data_manager is None: self.connect(**kwds) _disconnect = True elif len(kwds) > 0: # # Q: Should we reinitialize? The semantic difference between # initialize() and add_options() aren't clear. # self._data_manager.add_options(**kwds) # # Preprocess the command-line options # self._preprocess_options() self._load_data_from_model() # # Write from self._data # self._data_manager.write(self._data) # # Disconnect # if _disconnect: self.disconnect() if is_debug_set(logger): # pragma:nocover logger.debug("Done.")
[docs] def data(self, name=None, namespace=None): """ Return the data associated with a symbol and namespace Args: name (str): The name of the symbol that is returned. Default is :const:`None`, which indicates that the entire data in the namespace is returned. namespace (str): The name of the namespace that is accessed. Default is :const:`None`. Returns: If ``name`` is :const:`None`, then the dictionary for the namespace is returned. Otherwise, the data associated with ``name`` in given namespace is returned. The return value is a constant if :const:`None` if there is a single value in the symbol dictionary, and otherwise the symbol dictionary is returned. """ if not namespace in self._data: raise IOError("Unknown namespace '%s'" % str(namespace)) if name is None: return self._data[namespace] ans = self._data[namespace][name] if None in ans: # The data is a simple value return ans[None] return ans
[docs] def __getitem__(self, *args): """ Return the specified data value. If a single argument is given, then this is the symbol name:: dp = DataPortal() dp[name] If a two arguments are given, then the first is the namespace and the second is the symbol name:: dp = DataPortal() dp[namespace, name] Args: *args (str): A tuple of arguments. Returns: If a single argument is given, then the data associated with that symbol in the namespace :const:`None` is returned. If two arguments are given, then the data associated with symbol in the given namespace is returned. """ if type(args[0]) is tuple or type(args[0]) is list: assert len(args) == 1 args = args[0] if len(args) > 2: raise IOError( "Must specify data name: DataPortal[name] or Data[namespace, name]" ) elif len(args) == 2: namespace = args[0] name = args[1] else: namespace = None name = args[0] ans = self._data[namespace][name] if None in ans: # The data is a simple value return ans[None] return ans
[docs] def __setitem__(self, name, value): """ Set the value of ``name`` with the given value. Args: name (str): The name of the symbol that is set. value: The value of the symbol. """ if not None in self._data: self._data[None] = {} self._data[None][name] = value
[docs] def namespaces(self): """ Return an iterator for the namespaces in the data portal. Yields: A string name for the next namespace. """ for key in self._data: yield key
[docs] def keys(self, namespace=None): """ Return an iterator of the data keys in the specified namespace. Yields: A string name for the next symbol in the specified namespace. """ for key in self._data[namespace]: yield key
[docs] def values(self, namespace=None): """ Return an iterator of the data values in the specified namespace. Yields: The data value for the next symbol in the specified namespace. This may be a simple value, or a dictionary of values. """ for key in self._data[namespace]: ans = self._data[namespace][key] if None in ans: yield ans[None] else: yield ans
[docs] def items(self, namespace=None): """ Return an iterator of (name, value) tuples from the data in the specified namespace. Yields: The next (name, value) tuple in the namespace. If the symbol has a simple data value, then that is included in the tuple. Otherwise, the tuple includes a dictionary mapping symbol indices to values. """ for key in self._data[namespace]: ans = self._data[namespace][key] if None in ans: yield key, ans[None] else: yield key, ans
def _preprocess_options(self): """ Preprocess the options for a data manager. """ options = self._data_manager.options # if options.data is None and ( not options.set is None or not options.param is None or not options.index is None ): # # Set options.data to a list of elements of the options.set, # options.param and options.index values. # options.data = [] if not options.set is None: assert type(options.set) not in (list, tuple) options.data.append(options.set) # # The set option should not be a list or tuple. # # if type(options.set) in (list,tuple): # for item in options.set: # options.data.append(item) # else: # options.data.append(options.set) if not options.index is None: options.data.append(options.index) if not options.param is None: if type(options.param) in (list, tuple): for item in options.param: options.data.append(item) else: options.data.append(options.param) # if options.data is None: return # if type(options.data) in (list, tuple): # # If options.data is a list/tuple, then # process it to get the names of the # elements. Thus, if a component is included # in options.data, then it is replaced by its name. # ans = [] for item in options.data: try: ans.append(item.local_name) self._model = item.model() except: ans.append(item) options.data = ans else: # # If options.data is a single value, then we assume that # it is a component. Reset its value to the value of # the component name. # try: self._model = options.data.model() options.data = [self._data_manager.options.data.local_name] except: pass def _load_data_from_model(self): """ Load model data into self._data """ if self._data_manager.options.data is None or self._model is None: return for name in self._data_manager.options.data: c = getattr(self._model, name) try: self._data[name] = c.data() except: self._data[name] = c.extract_values()