# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
from pyomo.common.collections import Bunch
from pyomo.dataportal.process_data import _process_data
[docs]class TableData(object):
"""
A class used to read/write data from/to a table in an external
data source.
"""
[docs] def __init__(self):
"""
Constructor
"""
self._info = None
self._data = None
self.options = Bunch()
self.options.ncolumns = 1
[docs] def available(self):
"""
Returns:
Return :const:`True` if the data manager is available.
"""
return True
[docs] def initialize(self, **kwds):
"""
Initialize the data manager with keyword arguments.
The `filename` argument is recognized here, and other arguments
are passed to the :func:`add_options` method.
"""
self.filename = kwds.pop('filename')
self.add_options(**kwds)
[docs] def add_options(self, **kwds):
"""
Add the keyword options to the :class:`Options` object in this
object.
"""
self.options.update(kwds)
[docs] def open(self): # pragma:nocover
"""
Open the data manager.
"""
pass
[docs] def read(self): # pragma:nocover
"""
Read data from the data manager.
"""
return False
[docs] def write(self, data): # pragma:nocover
"""
Write data to the data manager.
"""
return False
[docs] def close(self): # pragma:nocover
"""
Close the data manager.
"""
pass
[docs] def process(self, model, data, default):
"""
Process the data that was extracted from this data manager and
return it.
"""
if model is None:
model = self.options.model
if not self.options.namespace in data:
data[self.options.namespace] = {}
return _process_data(
self._info,
model,
data[self.options.namespace],
default,
self.filename,
index=self.options.index,
set=self.options.set,
param=self.options.param,
ncolumns=self.options.ncolumns,
)
[docs] def clear(self):
"""
Clear the data that was extracted from this table
"""
self._info = None
def _set_data(self, headers, rows):
from pyomo.core.base.set import Set
from pyomo.core.base.param import Param
header_index = []
if self.options.select is None:
for i in range(len(headers)):
header_index.append(i)
else:
for i in self.options.select:
try:
header_index.append(headers.index(str(i)))
except:
print(
"Model declaration '%s' not found in returned query columns"
% str(i)
)
raise
self.options.ncolumns = len(headers)
if not self.options.param is None:
if not type(self.options.param) in (list, tuple):
self.options.param = (self.options.param,)
_params = []
for p in self.options.param:
if isinstance(p, Param):
self.options.model = p.model()
_params.append(p.local_name)
else:
_params.append(p)
self.options.param = tuple(_params)
if isinstance(self.options.set, Set):
self.options.model = self.options.set.model()
self.options.set = self.options.set.local_name
if isinstance(self.options.index, Set):
self.options.model = self.options.index.model()
self.options.index = self.options.index.local_name
elif type(self.options.index) in [tuple, list]:
tmp = []
for val in self.options.index:
if isinstance(val, Set):
tmp.append(val.local_name)
self.options.model = val.model()
else:
tmp.append(val)
self.options.index = tuple(tmp)
if self.options.format is None:
if not self.options.set is None:
self.options.format = 'set'
elif not self.options.param is None:
self.options.format = 'table'
if self.options.format is None:
raise ValueError("Unspecified format and data option")
elif self.options.set is None and self.options.param is None:
msg = "Must specify the set or parameter option for data"
raise IOError(msg)
if self.options.format == 'set':
if not self.options.index is None:
msg = "Cannot specify index for data with the 'set' format: %s"
raise IOError(msg % str(self.options.index))
self._info = ["set", self.options.set, ":="]
for row in rows:
if self.options.ncolumns > 1:
self._info.append(tuple(row))
else:
self._info.extend(row)
elif self.options.format == 'set_array':
if not self.options.index is None:
msg = "Cannot specify index for data with the 'set_array' format: %s"
raise IOError(msg % str(self.options.index))
self._info = ["set", self.options.set, ":"]
self._info.extend(headers[1:])
self._info.append(":=")
for row in rows:
self._info.extend(row)
elif self.options.format == 'transposed_array':
self._info = ["param", self.options.param[0], "(tr)", ":"]
self._info.extend(headers[1:])
self._info.append(":=")
for row in rows:
self._info.extend(row)
elif self.options.format == 'array':
self._info = ["param", self.options.param[0], ":"]
self._info.extend(headers[1:])
self._info.append(":=")
for row in rows:
self._info.extend(row)
elif self.options.format == 'table':
if self.options.index is not None:
self._info = ["param", ":", self.options.index, ":"]
else:
self._info = ["param", ":"]
for param in self.options.param:
self._info.append(param)
self._info.append(":=")
for row in rows:
for i in header_index:
self._info.append(row[i])
self.options.ncolumns = len(header_index)
else:
msg = "Unknown parameter format: '%s'"
raise ValueError(msg % self.options.format)
def _get_table(self):
from pyomo.core.expr import value
tmp = []
if self.options.columns is not None:
tmp.append(self.options.columns)
if self.options.set is not None:
# Create column names
if self.options.columns is None:
cols = []
for i in range(self.options.set.dimen):
cols.append(self.options.set.local_name + str(i))
tmp.append(cols)
# Get rows
if self.options.sort is not None:
for data in sorted(self.options.set):
if self.options.set.dimen > 1:
tmp.append(list(data))
else:
tmp.append([data])
else:
for data in self.options.set:
if self.options.set.dimen > 1:
tmp.append(list(data))
else:
tmp.append([data])
elif self.options.param is not None:
if type(self.options.param) in (list, tuple):
_param = self.options.param
else:
_param = [self.options.param]
# Collect data
for index in _param[0]:
if index is None:
row = []
elif type(index) in (list, tuple):
row = list(index)
else:
row = [index]
for param in _param:
row.append(value(param[index]))
tmp.append(row)
# Create column names
if self.options.columns is None:
cols = []
for i in range(len(tmp[0]) - len(_param)):
cols.append('I' + str(i))
for param in _param:
cols.append(param)
tmp.insert(0, cols)
return tmp