# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
#### Using mpi-sppy instead of PySP; May 2020
#### Adding option for "local" EF starting Sept 2020
#### Wrapping mpi-sppy functionality and local option Jan 2021, Feb 2021
#### Redesign with Experiment class Dec 2023
# Options for using mpi-sppy or local EF only used in the deprecatedEstimator class
# TODO: move use_mpisppy to a Pyomo configuration option
# False implies always use the EF that is local to parmest
use_mpisppy = True # Use it if we can but use local if not.
if use_mpisppy:
try:
# MPI-SPPY has an unfortunate side effect of outputting
# "[ 0.00] Initializing mpi-sppy" when it is imported. This can
# cause things like doctests to fail. We will suppress that
# information here.
from pyomo.common.tee import capture_output
with capture_output():
import mpisppy.utils.sputils as sputils
except ImportError:
use_mpisppy = False # we can't use it
if use_mpisppy:
# These things should be outside the try block.
sputils.disable_tictoc_output()
import mpisppy.opt.ef as st
import mpisppy.scenario_tree as scenario_tree
else:
import pyomo.contrib.parmest.utils.create_ef as local_ef
import pyomo.contrib.parmest.utils.scenario_tree as scenario_tree
from enum import Enum
import re
import importlib as im
import logging
import types
import json
from collections.abc import Callable
from itertools import combinations
from functools import singledispatchmethod
from collections import defaultdict
from pyomo.common.dependencies import (
attempt_import,
numpy as np,
numpy_available,
pandas as pd,
pandas_available,
scipy,
scipy_available,
)
import pyomo.environ as pyo
from pyomo.opt import SolverFactory
from pyomo.environ import Block, ComponentUID
from pyomo.opt.results.solver import assert_optimal_termination
from pyomo.common.flags import NOTSET
from pyomo.contrib.sensitivity_toolbox.sens import get_dsdp
import pyomo.contrib.parmest.utils as utils
import pyomo.contrib.parmest.graphics as graphics
from pyomo.dae import ContinuousSet
from pyomo.common.deprecation import deprecated
from pyomo.common.deprecation import deprecation_warning
parmest_available = numpy_available & pandas_available & scipy_available
inverse_reduced_hessian, inverse_reduced_hessian_available = attempt_import(
'pyomo.contrib.interior_point.inverse_reduced_hessian'
)
logger = logging.getLogger(__name__)
[docs]
def SSE(model):
"""
Returns an expression that is used to compute the sum of squared errors
('SSE') objective, assuming Gaussian i.i.d. errors
Parameters
----------
model : ConcreteModel
Annotated Pyomo model
"""
# check if the model has all the required suffixes
_check_model_labels(model)
# SSE between the prediction and observation of the measured variables
expr = sum((y - y_hat) ** 2 for y_hat, y in model.experiment_outputs.items())
return expr
[docs]
def SSE_weighted(model):
"""
Returns an expression that is used to compute the 'SSE_weighted' objective,
assuming Gaussian i.i.d. errors, with measurement error standard deviation
defined in the annotated Pyomo model
Parameters
----------
model : ConcreteModel
Annotated Pyomo model
"""
# check if the model has all the required suffixes
_check_model_labels(model)
# Check that measurement errors exist
if not hasattr(model, "measurement_error"):
raise AttributeError(
'Experiment model does not have suffix "measurement_error". '
'"measurement_error" is a required suffix for the "SSE_weighted" '
'objective.'
)
# check if all the values of the measurement error standard deviation
# have been supplied
all_known_errors = all(
model.measurement_error[y_hat] is not None for y_hat in model.experiment_outputs
)
if all_known_errors:
# calculate the weighted SSE between the prediction
# and observation of the measured variables
try:
expr = (1 / 2) * sum(
((y - y_hat) / model.measurement_error[y_hat]) ** 2
for y_hat, y in model.experiment_outputs.items()
)
return expr
except ZeroDivisionError:
raise ValueError(
'Division by zero encountered in the "SSE_weighted" objective. '
'One or more values of the measurement error are zero.'
)
else:
raise ValueError(
'One or more values are missing from "measurement_error". All values of '
'the measurement errors are required for the "SSE_weighted" objective.'
)
def _validate_prior_FIM(prior_FIM, require_psd=True):
"""
Validate user-supplied prior Fisher Information Matrix.
Parameters
----------
prior_FIM : pd.DataFrame
Prior Fisher Information Matrix with parameter names as both row and
column labels.
require_psd : bool, optional
If True, enforce positive semi-definiteness. Default is True.
"""
if not isinstance(prior_FIM, pd.DataFrame):
raise TypeError("prior_FIM must be a pandas DataFrame.")
if set(prior_FIM.index) != set(prior_FIM.columns):
raise ValueError(
"prior_FIM row and column labels must match the same parameter names."
)
if not np.issubdtype(prior_FIM.values.dtype, np.number):
raise TypeError("prior_FIM entries must be numeric.")
if not np.all(np.isfinite(prior_FIM.values)):
raise ValueError("prior_FIM entries must be finite.")
if not np.allclose(prior_FIM.values, prior_FIM.values.T):
raise ValueError("prior_FIM must be symmetric.")
if require_psd:
eigenvalues = np.linalg.eigvalsh(prior_FIM.values)
if np.any(eigenvalues < -1e-10): # tolerance for numerical noise
raise ValueError("prior_FIM must be positive semi-definite.")
def _expanded_unknown_parameter_info(model):
"""
Return scalar unknown parameter components, names, and ComponentUIDs.
The unknown_parameters suffix may contain either scalar ComponentData
objects or indexed components. Indexed components are expanded to their
scalar data objects.
"""
theta_components = []
for c in model.unknown_parameters:
if c.is_indexed():
theta_components.extend(c.values())
else:
theta_components.append(c)
theta_names = tuple(c.name for c in theta_components)
theta_cuids = tuple(pyo.ComponentUID(c) for c in theta_components)
return theta_components, theta_names, theta_cuids
def _calculate_L2_penalty(model, prior_FIM, theta_ref=None):
"""
Calculates (theta - theta_ref)^T * prior_FIM * (theta - theta_ref)
using label-based alignment for safety and subsets for efficiency.
Parameters
----------
model : ConcreteModel
Annotated Pyomo model
prior_FIM : pd.DataFrame
Prior Fisher Information Matrix from previous experimental design
theta_ref: dict, optional
Reference parameter values used in regularization.
If None, defaults to the current parameter values in the model.
Returns
-------
expr : Pyomo expression
Expression representing the L2-regularized objective addition
"""
_validate_prior_FIM(prior_FIM, require_psd=True)
# Get current model parameters
theta_components, current_param_names, _ = _expanded_unknown_parameter_info(model)
param_map = {p.name: p for p in theta_components}
# Confirm all matching parameters in both prior_FIM index and columns
common_params = [p for p in current_param_names if p in prior_FIM.columns]
if len(common_params) == 0:
logger.warning(
"Warning: No matching parameters found between Model and Prior FIM. "
"Returning standard objective."
)
return 0.0 # No regularization if no common parameters
elif len(common_params) < len(set(prior_FIM.columns)):
logger.warning(
"Warning: Only a subset of parameters in the Prior FIM match the Model parameters. "
"Regularization will be applied only to the matching subset."
)
else:
logger.info(
"All parameters in the Prior FIM match the Model parameters. "
"Regularization will be applied to all parameters."
)
# Slice the dataframes to ONLY the common subset
sub_FIM = prior_FIM.loc[common_params, common_params]
# For the reference theta, we also subset to the common parameters.
if theta_ref is not None:
missing_ref = [p for p in common_params if p not in theta_ref]
if missing_ref:
raise ValueError(
"theta_ref is missing values for parameter(s): "
+ ", ".join(missing_ref)
)
else:
sub_theta = {param: pyo.value(param_map[param]) for param in common_params}
logger.info(
"theta_ref is None. Using initialized parameter values as reference."
)
theta_ref = sub_theta
# Construct the Quadratic Form (The L2 term)
# Create the (theta - theta_ref) vector, ensuring alignment by parameter name
delta_params = np.array(
[param_map[p] - theta_ref[p] for p in common_params]
) # (theta - theta_ref) vector
# Compute the quadratic form: delta^T * FIM * delta
l2_term = delta_params.T @ sub_FIM.values @ delta_params
return l2_term
[docs]
def L2_regularized_objective(
model, prior_FIM, theta_ref=None, regularization_weight=1.0, obj_function=SSE
):
"""
Calculates objective + regularization_weight*(theta - theta_ref)^T * prior_FIM * (theta - theta_ref)
using label-based alignment for safety and subsets for efficiency.
Parameters
----------
model : ConcreteModel
Annotated Pyomo model
prior_FIM : pd.DataFrame
Prior Fisher Information Matrix from previous experimental design
theta_ref: dict, optional
Reference parameter values used in regularization.
If None, defaults to the current parameter values in the model.
regularization_weight: float, optional
Weighting factor for the regularization term. Default is 1.0.
obj_function: callable, optional
Built-in objective function selected by the user, e.g., `SSE`. Default is `SSE`.
Returns
-------
expr : Pyomo expression
Expression representing the L2-regularized objective
"""
# Calculate the L2 penalty term
l2_term = _calculate_L2_penalty(model, prior_FIM, theta_ref)
# Combine with objective
object_expr = obj_function(model)
if obj_function is SSE_weighted:
# current implementation of SSE weighted includes a
# 1/2 in front of the sum of squared errors to cancel out the 2 in the gradient,
# so we apply the same factor to the regularization term for consistency
# SSE does not have a 1/2 in front, so no additional factor is needed in that case
l2_term *= 0.5
# Calculate the regularized objective
l2reg_objective = object_expr + (regularization_weight * l2_term)
return l2reg_objective
def _check_model_labels(model):
"""
Checks if the annotated Pyomo model contains the necessary suffixes
Parameters
----------
model : ConcreteModel
Annotated Pyomo model
"""
required_attrs = ("experiment_outputs", "unknown_parameters")
# check if any of the required attributes are missing
missing_attr = [attr for attr in required_attrs if not hasattr(model, attr)]
if missing_attr:
missing_str = ", ".join(f'"{attr}"' for attr in missing_attr)
raise AttributeError(
f"Experiment model is missing required attribute(s): {missing_str}"
)
logger.info("Model has expected labels.")
def _get_labeled_model(experiment):
"""
Returns the annotated Pyomo model from the Experiment class
Parameters
----------
experiment : Experiment class
Experiment class object that contains the Pyomo model
for a particular experimental condition
"""
# check if the Experiment class has a "get_labeled_model" function
get_model = getattr(experiment, "get_labeled_model", None)
if not callable(get_model):
raise AttributeError(
'The experiment object must have a "get_labeled_model" function.'
)
try:
return get_model().clone()
except Exception as exc:
raise RuntimeError(f"Failed to clone labeled model: {exc}")
def _format_outputs_index_as_tuple(index):
"""
Formats the indices of indexed experiment outputs
(e.g., `m.CA[t]`, `m.CB[t]`, `m.C[t, "A"]`, `m.C[t, "B"]`) as a
tuple
"""
if isinstance(index, tuple):
return index
return (index,)
[docs]
def validate_experiment_outputs(output_vars):
"""
Checks that:
1. All variables in the `experiment_outputs` attribute have the same
number of indices
2. All variables in the `experiment_outputs` attribute share the same
index set
Parameters
----------
output_vars : pyomo.core.base.suffix.Suffix
Experiment output variables
"""
grouped_indices = defaultdict(list)
# group indices by parent component
for comp in output_vars:
index = comp.index()
# check if the output variable is a scalar (e.g., m.y1, m.y2)
if index is None:
pass
else:
# format the index of output variables
# (e.g., m.CA[t], m.CB[t], m.C[t, "A"], m.C[t, "B"]) as a tuple
index_tuple = _format_outputs_index_as_tuple(index)
# check if the first indexing is the data index
assert isinstance(
index_tuple[0], (int, float)
), "The first index of experiment outputs must be the data point"
parent = comp.parent_component().name
grouped_indices[parent].append(index)
# convert to a sorted unique tuples
grouped_indices = {
name: tuple(sorted(indices)) for name, indices in grouped_indices.items()
}
# reference index set
names = list(grouped_indices.keys())
if len(names) <= 1: # only one output variable exist
pass
else:
ref_name = names[0]
ref_indices = grouped_indices[ref_name]
for name in names[1:]:
assert len(grouped_indices[name]) == len(
ref_indices
), "Experiment outputs must have the same number of indices or data points"
assert (
grouped_indices[name] == ref_indices
), "Experiment outputs must share the same indices or data points"
def _count_total_experiments(experiment_list):
"""
Counts the number of data points in the list of experiments
This function has been updated to avoid double counting data points in
cases where the "experiment_outputs" suffix contain keys that belong
to different output variables (e.g., `m.y1`, `m.y2`) which are measured at
the same data point
Assumptions:
Experiment outputs can be scaler variables (e.g., `m.y1`, `m.y2`)
or
Experiment outputs can be indexed variables (e.g., `m.CA[t]`, `m.CB[t]`,
`m.C[t, "A"]`, `m.C[t, "B"]`). The data-point variable (e.g., `t`) must
be the first index. Within each experiment, the output families are
expected to share the same time points. Across experiments, the output
families are expected to contain the same number of time points.
Future versions will allow for heterogeneity in the number of data points
across experiments and will require changes to this function.
Parameters
----------
experiment_list : list
List of Experiment class objects containing the Pyomo model
for the different experimental conditions
Returns
-------
total_data_points : int
The total number of data points in the list of experiments
"""
total_data_points = 0
for experiment in experiment_list:
model = _get_labeled_model(experiment)
output_vars = model.experiment_outputs
# check if the experiment outputs are defined correctly
validate_experiment_outputs(output_vars)
# store the indices of the experiment outputs
indices = []
for output_var in output_vars.keys():
index = output_var.index()
indices.append(index)
# Case 1
# scalar outputs such as m.y1, m.y2,...
if all(index is None for index in indices):
total_data_points += 1
else:
# Case 2:
# outputs with one index, e.g., m.CA[t], m.CB[t],...
# outputs with two or more indices, e.g., m.C[t, "A"], m.C[t, "B"],...
# first index must be the data-point variable
# count unique time/sample indices within this experiment
experiment_data_points = set()
for index in indices:
index_tuple = _format_outputs_index_as_tuple(index)
# if the output has one index, this gives index itself
# if the output has two or more index, the first
# indexing must be the data index
data_point = index_tuple[0]
experiment_data_points.add(data_point)
total_data_points += len(experiment_data_points)
return total_data_points
[docs]
class CovarianceMethod(Enum):
finite_difference = "finite_difference"
automatic_differentiation_kaug = "automatic_differentiation_kaug"
reduced_hessian = "reduced_hessian"
[docs]
class ObjectiveType(Enum):
SSE = "SSE"
SSE_weighted = "SSE_weighted"
[docs]
class RegularizationType(Enum):
L2 = "L2"
# Compute the Jacobian matrix of measured variables with respect to the parameters
def _compute_jacobian(experiment, theta_vals, step, solver, tee, solver_options):
"""
Computes the Jacobian matrix of the measured variables with respect to the
parameters using the central finite difference scheme
Parameters
----------
experiment : Experiment class
Experiment class object that contains the Pyomo model
for a particular experimental condition
theta_vals : dict
Dictionary containing the estimates of the unknown parameters
step : float
Float used for relative perturbation of the parameters,
e.g., step=0.02 is a 2% perturbation
solver : str
Solver name specified by the user, e.g., 'ipopt'
tee : bool
Boolean solver option to be passed for verbose output
solver_options: dict
Dictionary of solver options to be passed for the finite difference calculations
Returns
-------
J : numpy.ndarray
Jacobian matrix of the measured variables
"""
# grab the model
model = _get_labeled_model(experiment)
# fix the value of the unknown parameters to the estimated values
for param in model.unknown_parameters:
param.fix(theta_vals[param.name])
# re-solve the model with the estimated parameters
solver = pyo.SolverFactory(solver)
if solver_options is not None:
for key in solver_options:
solver.options[key] = solver_options[key]
results = solver.solve(model, tee=tee)
assert_optimal_termination(results)
# get the estimated parameter values
param_values = [p.value for p in model.unknown_parameters]
# get the number of parameters and measured variables
n_params = len(param_values)
n_outputs = len(model.experiment_outputs)
# compute the sensitivity of the measured variables w.r.t the parameters
J = np.zeros((n_outputs, n_params))
for i, param in enumerate(model.unknown_parameters):
# store original value of the parameter
orig_value = param_values[i]
# calculate the relative perturbation
relative_perturbation = step * orig_value
# Forward perturbation
param.fix(orig_value + relative_perturbation)
# solve the model
results = solver.solve(model, tee=tee)
assert_optimal_termination(results)
# forward perturbation measured variables
y_hat_plus = [pyo.value(y_hat) for y_hat, y in model.experiment_outputs.items()]
# Backward perturbation
param.fix(orig_value - relative_perturbation)
# re-solve the model
results = solver.solve(model, tee=tee)
assert_optimal_termination(results)
# backward perturbation measured variables
y_hat_minus = [
pyo.value(y_hat) for y_hat, y in model.experiment_outputs.items()
]
# Restore the original parameter value
param.fix(orig_value)
# Central difference approximation for the Jacobian
J[:, i] = [
(y_hat_plus[w] - y_hat_minus[w]) / (2 * relative_perturbation)
for w in range(len(y_hat_plus))
]
return J
# Compute the covariance matrix of the estimated parameters
[docs]
def compute_covariance_matrix(
experiment_list,
method,
obj_function,
theta_vals,
step,
solver,
tee,
estimated_var=None,
prior_FIM=None,
regularization_weight=1.0,
solver_options=None,
):
"""
Computes the covariance matrix of the estimated parameters using
'finite_difference' or 'automatic_differentiation_kaug' methods
Parameters
----------
experiment_list : list
List of Experiment class objects containing the Pyomo model
for the different experimental conditions
method : str
Covariance calculation method specified by the user,
e.g., 'finite_difference'
obj_function: callable
Built-in objective function selected by the user, e.g., `SSE`
theta_vals : dict
Dictionary containing the estimates of the unknown parameters
step : float
Float used for relative perturbation of the parameters,
e.g., step=0.02 is a 2% perturbation
solver : str
Solver name specified by the user, e.g., 'ipopt'
tee : bool
Boolean solver option to be passed for verbose output
estimated_var: float, optional
Value of the estimated variance of the measurement error
in cases where the user does not supply the
measurement error standard deviation
prior_FIM: pd.DataFrame, optional
Prior Fisher Information Matrix from previous experimental design
to be added to the FIM of the current experiments for covariance estimation.
The prior_FIM should be a square matrix with parameter names as both
row and column labels.
regularization_weight: float, optional
Weighting factor for the regularization term. Default is 1.0.
solver_options: dict, optional
Dictionary of solver options to be passed for the finite difference calculations
Returns
-------
cov : pd.DataFrame
Covariance matrix of the estimated parameters
"""
# store the FIM of all the experiments
FIM_all_exp = []
if method == CovarianceMethod.finite_difference.value:
# loop through the experiments and compute the FIM
for experiment in experiment_list:
FIM_all_exp.append(
_finite_difference_FIM(
experiment,
theta_vals=theta_vals,
step=step,
solver=solver,
tee=tee,
estimated_var=estimated_var,
solver_options=solver_options,
)
)
elif method == CovarianceMethod.automatic_differentiation_kaug.value:
# loop through the experiments and compute the FIM
for experiment in experiment_list:
FIM_all_exp.append(
_kaug_FIM(
experiment,
obj_function=obj_function,
theta_vals=theta_vals,
solver=solver,
tee=tee,
estimated_var=estimated_var,
solver_options=solver_options,
)
)
FIM = np.sum(FIM_all_exp, axis=0)
# Add prior_FIM if including regularization. We expand the prior FIM to match the size of the
# FIM and align it based on parameter names to ensure correct addition.
# Add the prior FIM to the FIM of the current experiments, weighted by the regularization weight
if prior_FIM is not None:
if regularization_weight < 0:
raise ValueError("regularization_weight must be nonnegative.")
expanded_prior_FIM = _expand_prior_FIM(
experiment_list[0], prior_FIM # theta_vals
) # sanity check and alignment
# Check that the prior FIM shape is the same as the FIM shape
if expanded_prior_FIM.shape != FIM.shape:
raise ValueError(
"The shape of the prior FIM must be the same as the shape of the FIM."
)
if obj_function is SSE:
# The unweighted SSE objective is sum(r**2), without the 1/2
# used by SSE_weighted. Its objective Hessian is therefore
# 2 * J.T @ W @ J, so add the prior information with the same
# objective-Hessian scaling.
expanded_prior_FIM *= 2
FIM += expanded_prior_FIM * regularization_weight
# calculate the covariance matrix
try:
cov = np.linalg.inv(FIM)
except np.linalg.LinAlgError:
cov = np.linalg.pinv(FIM)
logger.warning("The FIM is singular. Using pseudo-inverse instead.")
# check if the covariance matrix is positive semi-definite
eigen_values = np.linalg.eigvalsh(cov)
if any(eig_val < -1e-10 for eig_val in eigen_values):
logger.warning("The covariance matrix is not positive semi-definite.")
cov = pd.DataFrame(cov, index=theta_vals.keys(), columns=theta_vals.keys())
return cov
# compute the Fisher information matrix of the estimated parameters using
# 'finite_difference'
def _finite_difference_FIM(
experiment, theta_vals, step, solver, tee, estimated_var=None, solver_options=None
):
"""
Computes the Fisher information matrix from 'finite_difference' Jacobian matrix
and measurement errors standard deviation defined in the annotated Pyomo model
Parameters
----------
experiment : Experiment class
Experiment class object that contains the Pyomo model
for a particular experimental condition
theta_vals : dict
Dictionary containing the estimates of the unknown parameters
step : float
Float used for relative perturbation of the parameters,
e.g., step=0.02 is a 2% perturbation
solver : str
Solver name specified by the user, e.g., 'ipopt'
tee : bool
Boolean solver option to be passed for verbose output
estimated_var: float or int, optional
Value of the estimated variance of the measurement error
in cases where the user does not supply the
measurement error standard deviation
solver_options: dict, optional
Dictionary of solver options to be passed for the finite difference calculations
Returns
-------
FIM : numpy.ndarray
Fisher information matrix of the estimated parameters
"""
# compute the Jacobian matrix using finite difference
J = _compute_jacobian(experiment, theta_vals, step, solver, tee, solver_options)
# computing the condition number of the Jacobian matrix
cond_number_jac = np.linalg.cond(J)
logger.info(f"The condition number of the Jacobian matrix is {cond_number_jac}")
# grab the model
model = _get_labeled_model(experiment)
# extract the measured variables and measurement errors
y_hat_list = [y_hat for y_hat, y in model.experiment_outputs.items()]
# check if the model has a 'measurement_error' attribute and
# the measurement error standard deviation has been supplied
all_known_errors = all(
model.measurement_error[y_hat] is not None for y_hat in model.experiment_outputs
)
if hasattr(model, "measurement_error") and all_known_errors:
error_list = [
model.measurement_error[y_hat] for y_hat in model.experiment_outputs
]
# check if the dimension of error_list is the same with that of y_hat_list
if len(error_list) != len(y_hat_list):
raise ValueError(
"Experiment outputs and measurement errors are not the same length."
)
# compute the matrix of the inverse of the measurement error variance
# the following assumes independent and identically distributed
# measurement errors
W = np.diag([1 / (err**2) for err in error_list])
# calculate the FIM using the formula in our future paper
# Lilonfe et al. (2025)
FIM = J.T @ W @ J
else:
FIM = (1 / estimated_var) * (J.T @ J)
return FIM
# compute the Fisher information matrix of the estimated parameters using
# 'automatic_differentiation_kaug'
def _kaug_FIM(
experiment,
obj_function,
theta_vals,
solver,
tee,
estimated_var=None,
solver_options=None,
):
"""
Computes the FIM using 'automatic_differentiation_kaug', a sensitivity-based
approach that uses the annotated Pyomo model optimality condition and
user-defined measurement errors standard deviation
Disclaimer - code adopted from the kaug function implemented in Pyomo.DoE
Parameters
----------
experiment : Experiment class
Experiment class object that contains the Pyomo model
for a particular experimental condition
obj_function: callable
Built-in objective function selected by the user, e.g., `SSE`
theta_vals : dict
Dictionary containing the estimates of the unknown parameters
solver : str
Solver name specified by the user, e.g., 'ipopt'
tee : bool
Boolean solver option to be passed for verbose output
estimated_var: float or int, optional
Value of the estimated variance of the measurement error
in cases where the user does not supply the
measurement error standard deviation
solver_options: dict, optional
Dictionary of solver options to be passed for the finite difference calculations
Returns
-------
FIM : numpy.ndarray
Fisher information matrix of the estimated parameters
"""
# grab the model
model = _get_labeled_model(experiment)
# deactivate any existing objective functions
for obj in model.component_objects(pyo.Objective):
obj.deactivate()
# add the built-in objective function selected by the user
model.objective = pyo.Objective(expr=obj_function, sense=pyo.minimize)
# fix the parameter values to the estimated values
for param in model.unknown_parameters:
param.fix(theta_vals[param.name])
solver = pyo.SolverFactory(solver)
if solver_options is not None:
for key in solver_options:
solver.options[key] = solver_options[key]
results = solver.solve(model, tee=tee)
assert_optimal_termination(results)
# Probe the solved model for dsdp results (sensitivities s.t. parameters)
params_dict = {k.name: v for k, v in model.unknown_parameters.items()}
params_names = list(params_dict.keys())
dsdp_re, col = get_dsdp(model, params_names, params_dict, tee=tee)
# analyze result
dsdp_array = dsdp_re.toarray().T
# store dsdp returned
dsdp_extract = []
# get right lines from results
measurement_index = []
# loop over measurement variables and their time points
for k, v in model.experiment_outputs.items():
name = k.name
try:
kaug_no = col.index(name)
measurement_index.append(kaug_no)
# get right line of dsdp
dsdp_extract.append(dsdp_array[kaug_no])
except ValueError:
# k_aug does not provide value for fixed variables
logger.debug("The variable is fixed: %s", name)
# produce the sensitivity for fixed variables
zero_sens = np.zeros(len(params_names))
# for fixed variables, the sensitivity are a zero vector
dsdp_extract.append(zero_sens)
# Extract and calculate sensitivity if scaled by constants or parameters.
jac = [[] for _ in params_names]
for d in range(len(dsdp_extract)):
for k, v in model.unknown_parameters.items():
p = params_names.index(k.name) # Index of parameter in np array
sensi = dsdp_extract[d][p]
jac[p].append(sensi)
# record kaug jacobian
kaug_jac = np.array(jac).T
# compute FIM
# compute the matrix of the inverse of the measurement error variance
# the following assumes independent and identically distributed
# measurement errors
W = np.zeros((len(model.measurement_error), len(model.measurement_error)))
all_known_errors = all(
model.measurement_error[y_hat] is not None for y_hat in model.experiment_outputs
)
count = 0
for k, v in model.measurement_error.items():
if all_known_errors:
W[count, count] = 1 / (v**2)
else:
W[count, count] = 1 / estimated_var
count += 1
FIM = kaug_jac.T @ W @ kaug_jac
return FIM
def _expand_prior_FIM(experiment, prior_FIM):
"""
Expands the prior FIM to match the size of the FIM of the current experiment
Parameters
----------
experiment : Experiment class
Experiment class object that contains the Pyomo model
for a particular experimental condition
prior_FIM : pd.DataFrame
Prior Fisher Information Matrix from previous experimental design
Returns
-------
expanded_prior_FIM : pd.DataFrame
Expanded prior FIM with the same size as the FIM of the current experiment
"""
_validate_prior_FIM(prior_FIM, require_psd=True)
model = _get_labeled_model(experiment)
# Extract parameter names from the Pyomo model
_, param_names, _ = _expanded_unknown_parameter_info(model)
# 1. Expand Prior FIM
# We reindex to match param_names. Parameters not in prior_FIM
# will be filled with 0, which maintains the PSD property.
expanded_prior_FIM = prior_FIM.reindex(
index=param_names, columns=param_names, fill_value=0.0
)
return expanded_prior_FIM
[docs]
class Estimator:
"""
Parameter estimation class
Parameters
----------
experiment_list: list of Experiments
A list of experiment objects which creates one labeled model for
each experiment
obj_function: string or function (optional)
Built-in objective ("SSE" or "SSE_weighted") or custom function
used to formulate parameter estimation objective.
If no function is specified, the model is used
"as is" and should be defined with a "FirstStageCost" and
"SecondStageCost" expression that are used to build an objective.
Default is None.
tee: bool, optional
If True, print the solver output to the screen. Default is False.
diagnostic_mode: bool, optional
If True, print diagnostics from the solver. Default is False.
solver_options: dict, optional
Provides options to the solver (also the name of an attribute).
Default is None.
regularization: string, optional
Built-in regularization type ("L2"). If no regularization is
specified, no regularization term is added to the objective.
Default is None.
prior_FIM: pd.DataFrame, optional
Prior Fisher Information Matrix from previous experimental design
to be added to the FIM of the current experiments for regularization.
The prior_FIM should be a square matrix with parameter names as both
row and column labels.
theta_ref: dict, optional
Reference parameter values used in regularization.
If None, defaults to the current parameter values in the model.
regularization_weight: float, optional
Weighting factor for the regularization term. Used with
``regularization="L2"``. Default is 1.0.
"""
# The singledispatchmethod decorator is used here as a deprecation
# shim to be able to support the now deprecated Estimator interface
# which had a different number of arguments. When the deprecated API
# is removed this decorator and the _deprecated_init method below
# can be removed
[docs]
@singledispatchmethod
def __init__(
self,
experiment_list,
obj_function=None,
tee=False,
diagnostic_mode=False,
solver_options=None,
regularization=None,
prior_FIM=None,
theta_ref=None,
regularization_weight=None,
):
# check that we have a (non-empty) list of experiments
assert isinstance(experiment_list, list)
self.exp_list = experiment_list
# get the number of experiments
self.number_exp = _count_total_experiments(self.exp_list)
# check if the experiment has a ``get_labeled_model`` function
model = _get_labeled_model(self.exp_list[0])
# check if the model has all the required suffixes
_check_model_labels(model)
# populate keyword argument options
if isinstance(obj_function, str):
try:
self.obj_function = ObjectiveType(obj_function)
except ValueError:
raise ValueError(
f"Invalid objective function: '{obj_function}'. "
f"Choose from: {[e.value for e in ObjectiveType]}."
)
else:
self.obj_function = obj_function
if isinstance(regularization, str):
try:
self.regularization = RegularizationType(regularization)
except ValueError:
raise ValueError(
f"Invalid regularization type: '{regularization}'. "
f"Choose from: {[e.value for e in RegularizationType]}."
)
elif isinstance(regularization, RegularizationType):
self.regularization = regularization
elif regularization is None:
self.regularization = None
else:
raise TypeError(
"regularization must be None or one of "
f"{[e.value for e in RegularizationType]}."
)
self.tee = tee
self.diagnostic_mode = diagnostic_mode
self.solver_options = solver_options
# Validate regularization options and defaults
if self.regularization is None:
if (
prior_FIM is not None
or theta_ref is not None
or regularization_weight is not None
):
raise ValueError(
"regularization must be set when supplying prior_FIM, theta_ref, "
"regularization_weight."
)
self.prior_FIM = None
self.theta_ref = None
self.regularization_weight = None
elif self.regularization == RegularizationType.L2:
if prior_FIM is None:
raise ValueError("prior_FIM must be provided when regularization='L2'.")
_validate_prior_FIM(prior_FIM, require_psd=True)
if theta_ref is not None and not isinstance(theta_ref, dict):
raise TypeError(
"theta_ref must be a dict mapping parameter names to reference values."
)
if regularization_weight is None:
regularization_weight = 1.0
if regularization_weight < 0:
raise ValueError("regularization_weight must be nonnegative.")
self.prior_FIM = prior_FIM
self.theta_ref = theta_ref
self.regularization_weight = regularization_weight
else:
raise ValueError(
f"Unsupported regularization option: {self.regularization}. "
f"Choose from {[e.value for e in RegularizationType]}."
)
# TODO: delete this when the deprecated interface is removed
self.pest_deprecated = None
# TODO This might not be needed here.
# We could collect the union (or intersect?) of thetas when the models are built
theta_names = []
for experiment in self.exp_list:
model = _get_labeled_model(experiment)
_, expanded_theta_names, _ = _expanded_unknown_parameter_info(model)
theta_names.extend(expanded_theta_names)
# Utilize list(dict.fromkeys(theta_names)) to preserve parameter
# order compared with list(set(theta_names)), which had
# nondeterministic ordering of parameters
self.estimator_theta_names = list(dict.fromkeys(theta_names))
self._second_stage_cost_exp = "SecondStageCost"
# boolean to indicate if model is initialized using a square solve
self.model_initialized = False
# If self.diagnostic mode is true, then set the logging level to DEBUG to print
# diagnostics from the solver
if self.diagnostic_mode:
logger.setLevel(logging.DEBUG)
# The deprecated Estimator constructor
# This works by checking the type of the first argument passed to
# the class constructor. If it matches the old interface (i.e. is
# callable) then this _deprecated_init method is called and the
# deprecation warning is displayed.
@__init__.register(Callable)
def _deprecated_init(
self,
model_function,
data,
theta_names,
obj_function=None,
tee=False,
diagnostic_mode=False,
solver_options=None,
):
deprecation_warning(
"You're using the deprecated parmest interface (model_function, "
"data, theta_names). This interface will be removed in a future release, "
"please update to the new parmest interface using experiment lists.",
version='6.7.2',
)
self.pest_deprecated = _DeprecatedEstimator(
model_function,
data,
theta_names,
obj_function,
tee,
diagnostic_mode,
solver_options,
)
def _return_theta_names(self):
"""
Return list of fitted model parameter names
"""
# check for deprecated inputs
if self.pest_deprecated:
# if fitted model parameter names differ from theta_names
# created when Estimator object is created
if hasattr(self, 'theta_names_updated'):
return self.pest_deprecated.theta_names_updated
else:
# default theta_names, created when Estimator object is created
return self.pest_deprecated.theta_names
else:
# if fitted model parameter names differ from theta_names
# created when Estimator object is created
if hasattr(self, 'theta_names_updated'):
return self.theta_names_updated
else:
# default theta_names, created when Estimator object is created
return self.estimator_theta_names
def _expanded_theta_info(self, model):
"""
Return scalar theta names and ComponentUIDs for all unknown parameters.
The unknown_parameters suffix may contain either scalar ComponentData
objects or indexed components. Indexed components are expanded to their
scalar data objects.
Names are intended for user-facing labels and DataFrame columns.
ComponentUIDs are intended for locating equivalent theta components on
cloned/transferred models.
"""
_, theta_names, theta_cuids = _expanded_unknown_parameter_info(model)
return theta_names, theta_cuids
def _create_parmest_model(self, experiment_number):
"""
Modify the Pyomo model for parameter estimation
"""
model = _get_labeled_model(self.exp_list[experiment_number])
if len(model.unknown_parameters) == 0:
model.parmest_dummy_var = pyo.Var(initialize=1.0)
# Add objective function (optional)
if self.obj_function:
# Check for component naming conflicts
reserved_names = [
'Total_Cost_Objective',
'FirstStageCost',
'SecondStageCost',
]
for n in reserved_names:
if model.component(n) or hasattr(model, n):
raise RuntimeError(
f"Parmest will not override the existing model component named {n}"
)
# Deactivate any existing objective functions
for obj in model.component_objects(pyo.Objective):
obj.deactivate()
# TODO, this needs to be turned into an enum class of options that still support
# custom functions
if isinstance(self.obj_function, ObjectiveType):
if self.obj_function == ObjectiveType.SSE:
self.covariance_objective = SSE
elif self.obj_function == ObjectiveType.SSE_weighted:
self.covariance_objective = SSE_weighted
base_objective = self.covariance_objective
else:
# A custom function uses model.experiment_outputs as data
self.covariance_objective = None
base_objective = self.obj_function
if self.regularization is None:
second_stage_rule = base_objective
elif self.regularization == RegularizationType.L2:
second_stage_rule = lambda m: L2_regularized_objective(
m,
prior_FIM=self.prior_FIM,
theta_ref=self.theta_ref,
regularization_weight=self.regularization_weight,
obj_function=base_objective,
)
else:
raise ValueError(
f"Unsupported regularization option: {self.regularization}. "
f"Choose from {[e.value for e in RegularizationType]}."
)
model.FirstStageCost = pyo.Expression(expr=0)
model.SecondStageCost = pyo.Expression(rule=second_stage_rule)
def TotalCost_rule(model):
return model.FirstStageCost + model.SecondStageCost
model.Total_Cost_Objective = pyo.Objective(
rule=TotalCost_rule, sense=pyo.minimize
)
# Convert theta Params to Vars, and unfix theta Vars
theta_names = [k.name for k, v in model.unknown_parameters.items()]
parmest_model = utils.convert_params_to_vars(model, theta_names, fix_vars=False)
return parmest_model
def _instance_creation_callback(self, experiment_number=None, cb_data=None):
model = self._create_parmest_model(experiment_number)
return model
def _create_scenario_blocks(self, bootlist=None, theta_vals=None, fix_theta=False):
"""
Create scenario blocks for parameter estimation.
Parameters
----------
bootlist : list, optional
List of bootstrap experiment numbers to use. If None, use all experiments in exp_list.
Default is None.
theta_vals : dict, optional
Dictionary of theta values to set in the model. If None, use default values from experiment class.
Default is None.
fix_theta : bool, optional
If True, fix the theta values in the model. If False, leave them free.
Default is False.
Returns
-------
model : ConcreteModel
Pyomo model with scenario blocks for parameter estimation. Contains indexed block for
each experiment in exp_list or bootlist.
"""
model = pyo.ConcreteModel()
template_model = self._create_parmest_model(0)
expanded_theta_names, expanded_theta_cuids = self._expanded_theta_info(
template_model
)
model._parmest_theta_names = tuple(expanded_theta_names)
model._parmest_theta_cuids = tuple(expanded_theta_cuids)
# Parent/global EF theta variables. These are indexed by names because
# they serve as user-facing labels and result keys.
model.parmest_theta = pyo.Var(model._parmest_theta_names)
# Initialize parent/global theta values from the template model.
for name, cuid in zip(expanded_theta_names, expanded_theta_cuids):
template_theta_var = cuid.find_component_on(template_model)
if template_theta_var is None:
raise RuntimeError(
f"Could not find theta variable for CUID {cuid} "
"in the template model."
)
parent_theta_var = model.parmest_theta[name]
if theta_vals is not None and name in theta_vals:
parent_theta_var.set_value(theta_vals[name])
else:
parent_theta_var.set_value(pyo.value(template_theta_var))
if fix_theta:
parent_theta_var.fix()
else:
parent_theta_var.unfix()
scenario_numbers = (
bootlist if bootlist is not None else list(range(len(self.exp_list)))
)
# get the probability constant that is applied to the objective function
# parmest solves the estimation problem by applying equal probabilities to
# the objective function of all the scenarios from the experiment list
self.obj_probability_constant = len(scenario_numbers)
# Index scenario blocks by position, not experiment number, so bootstrap
# samples with repeated experiments are preserved.
model.scenario_indices = pyo.RangeSet(0, self.obj_probability_constant - 1)
model.scenario_number = pyo.Param(
model.scenario_indices,
initialize={
i: experiment_number
for i, experiment_number in enumerate(scenario_numbers)
},
within=pyo.NonNegativeIntegers,
mutable=False,
)
model.exp_scenarios = pyo.Block(model.scenario_indices)
for i in model.scenario_indices:
experiment_number = pyo.value(model.scenario_number[i])
parmest_model = self._create_parmest_model(experiment_number)
model.exp_scenarios[i].transfer_attributes_from(parmest_model)
model.theta_link_constraints = pyo.ConstraintList()
# Link scenario-local theta variables to the parent/global EF theta vars.
# CUIDs are used for component lookup. Names are only used to index the
# parent EF theta variable.
for name, cuid in zip(expanded_theta_names, expanded_theta_cuids):
parent_theta_var = model.parmest_theta[name]
theta_value = pyo.value(parent_theta_var)
for i in model.scenario_indices:
child_theta_var = cuid.find_component_on(model.exp_scenarios[i])
if child_theta_var is None:
raise RuntimeError(
f"Could not find theta variable for CUID {cuid} "
f"in scenario block {i}."
)
child_theta_var.set_value(theta_value)
if fix_theta:
child_theta_var.fix()
else:
child_theta_var.unfix()
model.theta_link_constraints.add(
child_theta_var == parent_theta_var
)
for block in model.exp_scenarios.values():
for obj in block.component_objects(pyo.Objective):
obj.deactivate()
def total_obj(m):
return (
sum(
block.Total_Cost_Objective.expr
for block in m.exp_scenarios.values()
)
/ self.obj_probability_constant
)
model.Obj = pyo.Objective(rule=total_obj, sense=pyo.minimize)
self.ef_instance = model
return model
def _Q_opt(
self,
return_values=None,
bootlist=None,
solver="ef_ipopt",
theta_vals=None,
fix_theta=False,
):
"""
_Q_opt method for parameter estimation using an extensive form
optimization problem with scenario blocks for each experiment.
Scenario blocks are created by cloning the original model for
each experiment and linking the parameter variables across blocks.
The objective is defined as the average of the individual scenario
objectives, which allows for simultaneous optimization across
all experiments.
Steps:
1. Load model - parmest model should be labeled
2. Create scenario blocks - clone model to have one per experiment
3. Define objective and constraints for the block
4. Solve the block as a single problem
5. Analyze results and extract parameter estimates
Parameters
----------
return_values : list, optional
List of variable names to return values for. Default is None.
bootlist : list, optional
List of bootstrap experiment numbers to use. If None, use all experiments in exp_list.
Default is None.
theta_vals : dict, optional
Dictionary of theta values to set in the model. If None, use default values from experiment class.
Default is None.
solver : str, optional
Solver to use for optimization. Default is "ef_ipopt".
fix_theta : bool, optional
If True, fix the theta values in the model. If False, leave them free.
Default is False.
Returns
-------
obj_value : float or None
Objective value of the solved model.
If fix_theta is False, this is the optimal objective value.
If fix_theta is True, this is the objective value at the fixed
theta values. If the fixed-theta problem does not return a
solution, obj_value is None.
theta_estimates : dict
Dictionary of theta values keyed by theta name.
If fix_theta is False, this contains the estimated parameter values
that optimize the objective.
If fix_theta is True, this contains the fixed theta values used in
the model.
var_values : pd.DataFrame, optional
DataFrame of variable values for the variables specified in
return_values. Only returned when fix_theta is False and
return_values is not None and contains at least one valid variable
name. The DataFrame has one row per scenario block and columns
corresponding to the variable names in return_values.
termination_condition : pyomo.opt.TerminationCondition, optional
Solver termination condition. Only returned when fix_theta is True.
"""
# Create extended form model with scenario blocks
model = self._create_scenario_blocks(
bootlist=bootlist, theta_vals=theta_vals, fix_theta=fix_theta
)
expanded_theta_names = list(model._parmest_theta_names)
logger.debug("Parmest _Q_opt model with scenario blocks:")
if logger.isEnabledFor(logging.DEBUG):
model.pprint()
# Check solver and set options
if solver == "k_aug":
raise RuntimeError("k_aug no longer supported.")
if solver == "ef_ipopt":
sol = SolverFactory('ipopt')
else:
raise RuntimeError("Unknown solver in Q_Opt=" + solver)
# Currently, parmest is only tested with ipopt via ef_ipopt
# No other pyomo solvers have been verified to work with parmest from current release
# to my knowledge.
if self.solver_options is not None:
for key in self.solver_options:
sol.options[key] = self.solver_options[key]
# Solve model without loading solution values until the termination
# condition has been checked.
solve_result = sol.solve(model, tee=self.tee, load_solutions=False)
termination_condition = solve_result.solver.termination_condition
if fix_theta and (
len(solve_result.solution) == 0
or termination_condition == pyo.TerminationCondition.infeasible
):
theta_estimates = {
name: pyo.value(model.parmest_theta[name])
for name in expanded_theta_names
}
obj_value = None
return obj_value, theta_estimates, termination_condition
if not fix_theta:
assert_optimal_termination(solve_result)
model.solutions.load_from(solve_result)
# Extract objective value
obj_value = pyo.value(model.Obj)
theta_estimates = {}
# Extract theta estimates from parent model
for name in expanded_theta_names:
theta_estimates[name] = pyo.value(model.parmest_theta[name])
self.obj_value = obj_value
self.estimated_theta = theta_estimates
# If fixing theta, return objective value, theta estimates, and solver status
if fix_theta:
return obj_value, theta_estimates, termination_condition
# Extract return values if requested
# Assumes the model components are named the same in each block, and are pyo.Vars.
if return_values is not None and len(return_values) > 0:
var_values = []
# In the scenario blocks structure, exp_scenarios is an IndexedBlock
exp_blocks = model.exp_scenarios.values()
# Loop over each experiment block and extract requested variable values
for exp_i in exp_blocks:
# In each block, extract requested variables
vals = {}
for var in return_values:
# Find the variable in the block
exp_i_var = exp_i.find_component(str(var))
# Check if variable exists in the block
if exp_i_var is None:
continue
# Extract value(s) from variable
if type(exp_i_var) == ContinuousSet:
temp = list(exp_i_var)
else:
temp = [pyo.value(_) for _ in exp_i_var.values()]
if len(temp) == 1:
vals[var] = temp[0]
else:
vals[var] = temp
# Only append if vals is not empty
if len(vals) > 0:
var_values.append(vals)
# Convert to DataFrame
var_values = pd.DataFrame(var_values)
if return_values is not None and len(return_values) > 0:
return obj_value, theta_estimates, var_values
else:
return obj_value, theta_estimates
def _cov_at_theta(self, method, solver, step):
"""
Covariance matrix calculation using all scenarios in the data
Parameters
----------
method : str
Covariance calculation method specified by the user,
e.g., 'finite_difference'
solver : str
Solver name specified by the user, e.g., 'ipopt'
step : float
Float used for relative perturbation of the parameters,
e.g., step=0.02 is a 2% perturbation
Returns
-------
cov : pd.DataFrame
Covariance matrix of the estimated parameters
"""
if hasattr(self.ef_instance, "exp_scenarios"):
# The EF now indexes scenario blocks by scenario position. Pull the
# first position from the model metadata instead of hard-coding 0 so
# future indexing changes stay localized to scenario_indices.
scenario_index = next(iter(self.ef_instance.scenario_indices))
ref_model = self.ef_instance.exp_scenarios[scenario_index]
else:
ref_model = self.ef_instance
if method == CovarianceMethod.reduced_hessian.value:
# compute the inverse reduced hessian to be used
# in the "reduced_hessian" method
# retrieve the independent variables (i.e., estimated parameters)
ind_vars = list(self.ef_instance.parmest_theta.values())
solve_result, inv_red_hes = (
inverse_reduced_hessian.inv_reduced_hessian_barrier(
self.ef_instance,
independent_variables=ind_vars,
solver_options=self.solver_options,
tee=self.tee,
)
)
self.inv_red_hes = inv_red_hes
else:
# if not using the 'reduced_hessian' method, calculate the sum of squared errors
# using 'finite_difference' method or 'automatic_differentiation_kaug'
sse_vals = []
for experiment in self.exp_list:
model = _get_labeled_model(experiment)
# fix the value of the unknown parameters to the estimated values
for param in model.unknown_parameters:
param.fix(self.estimated_theta[param.name])
# re-solve the model with the estimated parameters
results = pyo.SolverFactory(solver).solve(model, tee=self.tee)
assert_optimal_termination(results)
# choose and evaluate the sum of squared errors expression
if self.obj_function == ObjectiveType.SSE:
sse_expr = SSE(model)
elif self.obj_function == ObjectiveType.SSE_weighted:
sse_expr = SSE_weighted(model)
else:
raise ValueError(
f"Invalid objective function for covariance calculation. "
f"The covariance matrix can only be calculated using the built-in "
f"objective functions: {[e.value for e in ObjectiveType]}. Supply "
f"the Estimator object one of these built-in objectives and "
f"re-run the code."
)
# evaluate the numerical SSE and store it
sse_val = pyo.value(sse_expr)
sse_vals.append(sse_val)
sse = sum(sse_vals)
logger.info(
f"The sum of squared errors at the estimated parameter(s) is: {sse}"
)
# Number of data points considered
n = self.number_exp
# Extract the number of fitted parameters
l = len(self.estimated_theta)
"""Calculate covariance assuming experimental observation errors are
independent and follow a Gaussian distribution with constant variance.
The formula used in parmest was verified against equations (7-5-15) and
(7-5-16) in "Nonlinear Parameter Estimation", Y. Bard, 1974.
This formula is also applicable if the objective is scaled by a constant;
the constant cancels out. (was scaled by 1/n because it computes an
expected value.)
"""
# check if the user-supplied covariance method is supported
try:
cov_method = CovarianceMethod(method)
except ValueError:
raise ValueError(
f"Invalid method: '{method}'. Choose "
f"from: {[e.value for e in CovarianceMethod]}."
)
# check if the user specified 'SSE' or 'SSE_weighted' as the objective function
cov_prior_FIM = None
cov_regularization_weight = None
if self.regularization == RegularizationType.L2:
cov_prior_FIM = self.prior_FIM
cov_regularization_weight = self.regularization_weight
if self.obj_function == ObjectiveType.SSE:
# check if the user defined the 'measurement_error' attribute
if hasattr(ref_model, "measurement_error"):
# get the measurement errors
meas_error = [
ref_model.measurement_error[y_hat]
for y_hat, y in ref_model.experiment_outputs.items()
]
# check if the user supplied the values of the measurement errors
if all(item is None for item in meas_error):
if cov_method == CovarianceMethod.reduced_hessian:
# in the "reduced_hessian" method, use the objective value
# to calculate the measurement error variance because this
# method scales the objective function by a probability constant
# when computing the inverse of the reduced hessian
measurement_var = self.obj_value / (
n - l
) # estimate of the measurement error variance
cov = (
2 * measurement_var * self.inv_red_hes
) # covariance matrix
cov = pd.DataFrame(
cov,
index=self.estimated_theta.keys(),
columns=self.estimated_theta.keys(),
)
else:
measurement_var = sse / (
n - l
) # estimate of the measurement error variance
cov = compute_covariance_matrix(
self.exp_list,
method,
obj_function=self.covariance_objective,
theta_vals=self.estimated_theta,
solver=solver,
step=step,
tee=self.tee,
prior_FIM=cov_prior_FIM,
regularization_weight=cov_regularization_weight,
estimated_var=measurement_var,
solver_options=self.solver_options,
)
elif all(item is not None for item in meas_error):
if cov_method == CovarianceMethod.reduced_hessian:
# in the "reduced_hessian" method, the measurement error
# variance must be scaled by the probability constant that
# was applied to the objective function when computing
# the inverse of the reduced hessian
cov = (
2
* (meas_error[0] ** 2 / self.obj_probability_constant)
* self.inv_red_hes
)
cov = pd.DataFrame(
cov,
index=self.estimated_theta.keys(),
columns=self.estimated_theta.keys(),
)
else:
cov = compute_covariance_matrix(
self.exp_list,
method,
obj_function=self.covariance_objective,
theta_vals=self.estimated_theta,
solver=solver,
step=step,
tee=self.tee,
prior_FIM=cov_prior_FIM,
regularization_weight=cov_regularization_weight,
solver_options=self.solver_options,
)
else:
raise ValueError(
"One or more values of the measurement errors have "
"not been supplied."
)
else:
raise AttributeError(
'Experiment model does not have suffix "measurement_error".'
)
elif self.obj_function == ObjectiveType.SSE_weighted:
# check if the user defined the 'measurement_error' attribute
if hasattr(ref_model, "measurement_error"):
meas_error = [
ref_model.measurement_error[y_hat]
for y_hat, y in ref_model.experiment_outputs.items()
]
# check if the user supplied the values for the measurement errors
if all(item is not None for item in meas_error):
if cov_method == CovarianceMethod.reduced_hessian:
# in the "reduced_hessian" method, since the objective function
# was scaled by a probability constant when computing the
# inverse of the reduced hessian, the inverse of the reduced
# hessian must be divided by the probability constant to obtain
# the covariance matrix
cov = (1 / self.obj_probability_constant) * self.inv_red_hes
cov = pd.DataFrame(
cov,
index=self.estimated_theta.keys(),
columns=self.estimated_theta.keys(),
)
else:
cov = compute_covariance_matrix(
self.exp_list,
method,
obj_function=self.covariance_objective,
theta_vals=self.estimated_theta,
prior_FIM=cov_prior_FIM,
regularization_weight=cov_regularization_weight,
step=step,
solver=solver,
tee=self.tee,
solver_options=self.solver_options,
)
else:
raise ValueError(
'One or more values are missing from "measurement_error". All values of '
'the measurement errors are required for the "SSE_weighted" objective.'
)
else:
raise AttributeError(
'Experiment model does not have suffix "measurement_error". '
'"measurement_error" is a required suffix for the "SSE_weighted" '
'objective.'
)
else:
raise ValueError(
f"Invalid objective function for covariance calculation. "
f"The covariance matrix can only be calculated using the built-in "
f"objective functions: {[e.value for e in ObjectiveType]}. Supply "
f"the Estimator object one of these built-in objectives and "
f"re-run the code."
)
return cov
def _get_sample_list(self, samplesize, num_samples, replacement=True):
samplelist = list()
scenario_numbers = list(range(len(self.exp_list)))
if num_samples is None:
# This could get very large
for i, l in enumerate(combinations(scenario_numbers, samplesize)):
samplelist.append((i, np.sort(l)))
else:
for i in range(num_samples):
attempts = 0
unique_samples = 0 # check for duplicates in each sample
duplicate = False # check for duplicates between samples
while (unique_samples <= len(self._return_theta_names())) and (
not duplicate
):
sample = np.random.choice(
scenario_numbers, samplesize, replace=replacement
)
sample = np.sort(sample).tolist()
unique_samples = len(np.unique(sample))
if sample in samplelist:
duplicate = True
attempts += 1
if attempts > num_samples: # arbitrary timeout limit
raise RuntimeError(
"Internal error: timeout constructing "
"a sample, the dim of theta may be too "
"close to the samplesize"
)
samplelist.append((i, sample))
return samplelist
[docs]
def theta_est(
self, solver="ef_ipopt", return_values=[], calc_cov=NOTSET, cov_n=NOTSET
):
"""
Parameter estimation using all scenarios in the data
Parameters
----------
solver: str, optional
Currently only "ef_ipopt" is supported. Default is "ef_ipopt".
return_values: list, optional
List of Variable names, used to return values from the model
for data reconciliation
calc_cov: boolean, optional
DEPRECATED.
If True, calculate and return the covariance matrix
(only for "ef_ipopt" solver). Default is NOTSET
cov_n: int, optional
DEPRECATED.
If calc_cov=True, then the user needs to supply the number of datapoints
that are used in the objective function. Default is NOTSET
Returns
-------
obj_val: float
The objective function value
theta_vals: pd.Series
Estimated values for theta
var_values: pd.DataFrame
Variable values for each variable name in
return_values (only for solver='ef_ipopt')
"""
assert isinstance(solver, str)
assert isinstance(return_values, list)
assert (calc_cov is NOTSET) or isinstance(calc_cov, bool)
if calc_cov is not NOTSET:
deprecation_warning(
"theta_est(): `calc_cov` and `cov_n` are deprecated options and "
"will be removed in the future. Please use the `cov_est()` function "
"for covariance calculation.",
version="6.9.5",
)
else:
calc_cov = False
# check if we are using deprecated parmest
if self.pest_deprecated is not None and calc_cov:
return self.pest_deprecated.theta_est(
solver=solver,
return_values=return_values,
calc_cov=calc_cov,
cov_n=cov_n,
)
elif self.pest_deprecated is not None and not calc_cov:
return self.pest_deprecated.theta_est(
solver=solver, return_values=return_values
)
return self._Q_opt(solver=solver, return_values=return_values, bootlist=None)
[docs]
def cov_est(self, method="finite_difference", solver="ipopt", step=1e-3):
"""
Covariance matrix calculation using all scenarios in the data
Parameters
----------
method : str, optional
Covariance calculation method. Options - 'finite_difference',
'reduced_hessian', and 'automatic_differentiation_kaug'.
Default is 'finite_difference'
solver : str, optional
Solver name, e.g., 'ipopt'. Default is 'ipopt'
step : float, optional
Float used for relative perturbation of the parameters,
e.g., step=0.02 is a 2% perturbation. Default is 1e-3
Returns
-------
cov : pd.DataFrame
Covariance matrix of the estimated parameters
"""
# check if the solver input is a string
if not isinstance(solver, str):
raise TypeError("Expected a string for the solver, e.g., 'ipopt'")
# check if the method input is a string
if not isinstance(method, str):
raise TypeError(
"Expected a string for the method, e.g., 'finite_difference'"
)
# check if the step input is a float
if not isinstance(step, float):
raise TypeError("Expected a float for the step, e.g., 1e-2")
# number of unknown parameters
num_unknowns = max(
[
len(_expanded_unknown_parameter_info(experiment.get_labeled_model())[0])
for experiment in self.exp_list
]
)
assert self.number_exp > num_unknowns, (
"The number of datapoints must be greater than the "
"number of parameters to estimate."
)
return self._cov_at_theta(method=method, solver=solver, step=step)
[docs]
def theta_est_bootstrap(
self,
bootstrap_samples,
samplesize=None,
replacement=True,
seed=None,
return_samples=False,
):
"""
Parameter estimation using bootstrap resampling of the data
Parameters
----------
bootstrap_samples: int
Number of bootstrap samples to draw from the data
samplesize: int or None, optional
Size of each bootstrap sample. If samplesize=None, samplesize will be
set to the number of samples in the data
replacement: bool, optional
Sample with or without replacement. Default is True.
seed: int or None, optional
Random seed
return_samples: bool, optional
Return a list of sample numbers used in each bootstrap estimation.
Default is False.
Returns
-------
bootstrap_theta: pd.DataFrame
Theta values for each sample and (if return_samples = True)
the sample numbers used in each estimation
"""
# check if we are using deprecated parmest
if self.pest_deprecated is not None:
return self.pest_deprecated.theta_est_bootstrap(
bootstrap_samples,
samplesize=samplesize,
replacement=replacement,
seed=seed,
return_samples=return_samples,
)
assert isinstance(bootstrap_samples, int)
assert isinstance(samplesize, (type(None), int))
assert isinstance(replacement, bool)
assert isinstance(seed, (type(None), int))
assert isinstance(return_samples, bool)
if samplesize is None:
samplesize = len(self.exp_list)
if seed is not None:
np.random.seed(seed)
global_list = self._get_sample_list(samplesize, bootstrap_samples, replacement)
task_mgr = utils.ParallelTaskManager(bootstrap_samples)
local_list = task_mgr.global_to_local_data(global_list)
bootstrap_theta = list()
for idx, sample in local_list:
objval, thetavals = self._Q_opt(bootlist=list(sample))
thetavals['samples'] = sample
bootstrap_theta.append(thetavals)
global_bootstrap_theta = task_mgr.allgather_global_data(bootstrap_theta)
bootstrap_theta = pd.DataFrame(global_bootstrap_theta)
if not return_samples:
del bootstrap_theta['samples']
return bootstrap_theta
[docs]
def theta_est_leaveNout(
self, lNo, lNo_samples=None, seed=None, return_samples=False
):
"""
Parameter estimation where N data points are left out of each sample
Parameters
----------
lNo: int
Number of data points to leave out for parameter estimation
lNo_samples: int
Number of leave-N-out samples. If lNo_samples=None, the maximum
number of combinations will be used
seed: int or None, optional
Random seed
return_samples: bool, optional
Return a list of sample numbers that were left out. Default is False.
Returns
-------
lNo_theta: pd.DataFrame
Theta values for each sample and (if return_samples = True)
the sample numbers left out of each estimation
"""
# check if we are using deprecated parmest
if self.pest_deprecated is not None:
return self.pest_deprecated.theta_est_leaveNout(
lNo, lNo_samples=lNo_samples, seed=seed, return_samples=return_samples
)
assert isinstance(lNo, int)
assert isinstance(lNo_samples, (type(None), int))
assert isinstance(seed, (type(None), int))
assert isinstance(return_samples, bool)
samplesize = len(self.exp_list) - lNo
if seed is not None:
np.random.seed(seed)
global_list = self._get_sample_list(samplesize, lNo_samples, replacement=False)
task_mgr = utils.ParallelTaskManager(len(global_list))
local_list = task_mgr.global_to_local_data(global_list)
lNo_theta = list()
for idx, sample in local_list:
objval, thetavals = self._Q_opt(bootlist=list(sample))
lNo_s = list(set(range(len(self.exp_list))) - set(sample))
thetavals['lNo'] = np.sort(lNo_s)
lNo_theta.append(thetavals)
global_bootstrap_theta = task_mgr.allgather_global_data(lNo_theta)
lNo_theta = pd.DataFrame(global_bootstrap_theta)
if not return_samples:
del lNo_theta['lNo']
return lNo_theta
[docs]
def leaveNout_bootstrap_test(
self, lNo, lNo_samples, bootstrap_samples, distribution, alphas, seed=None
):
"""
Leave-N-out bootstrap test to compare theta values where N data points are
left out to a bootstrap analysis using the remaining data,
results indicate if theta is within a confidence region
determined by the bootstrap analysis
Parameters
----------
lNo: int
Number of data points to leave out for parameter estimation
lNo_samples: int
Leave-N-out sample size. If lNo_samples=None, the maximum number
of combinations will be used
bootstrap_samples: int:
Bootstrap sample size
distribution: string
Statistical distribution used to define a confidence region,
options = 'MVN' for multivariate_normal, 'KDE' for gaussian_kde,
and 'Rect' for rectangular.
alphas: list
List of alpha values used to determine if theta values are inside
or outside the region.
seed: int or None, optional
Random seed
Returns
-------
List of tuples with one entry per lNo_sample:
* The first item in each tuple is the list of N samples that are left
out.
* The second item in each tuple is a DataFrame of theta estimated using
the N samples.
* The third item in each tuple is a DataFrame containing results from
the bootstrap analysis using the remaining samples.
For each DataFrame a column is added for each value of alpha which
indicates if the theta estimate is in (True) or out (False) of the
alpha region for a given distribution (based on the bootstrap results)
"""
# check if we are using deprecated parmest
if self.pest_deprecated is not None:
return self.pest_deprecated.leaveNout_bootstrap_test(
lNo, lNo_samples, bootstrap_samples, distribution, alphas, seed=seed
)
assert isinstance(lNo, int)
assert isinstance(lNo_samples, (type(None), int))
assert isinstance(bootstrap_samples, int)
assert distribution in ['Rect', 'MVN', 'KDE']
assert isinstance(alphas, list)
assert isinstance(seed, (type(None), int))
if seed is not None:
np.random.seed(seed)
global_list = self._get_sample_list(lNo, lNo_samples, replacement=False)
results = []
for idx, sample in global_list:
obj, theta = self.theta_est()
bootstrap_theta = self.theta_est_bootstrap(bootstrap_samples, seed=seed)
training, test = self.confidence_region_test(
bootstrap_theta,
distribution=distribution,
alphas=alphas,
test_theta_values=theta,
seed=seed,
)
results.append((sample, test, training))
return results
def _normalize_theta_dataframe(self, theta_values):
"""
Validate and normalize user-provided theta_values columns.
Returns a copy whose columns are the canonical expanded theta names from
the model.
This allows quote-insensitive matching while preserving canonical model
names internally.
"""
assert isinstance(theta_values, pd.DataFrame)
template_model = self._create_parmest_model(0)
expected_theta_names, _ = self._expanded_theta_info(template_model)
expected_theta_names = list(expected_theta_names)
def clean(name):
return str(name).replace("'", "")
provided_theta_names = list(theta_values.columns)
clean_provided = [clean(name) for name in provided_theta_names]
clean_expected = [clean(name) for name in expected_theta_names]
if len(clean_provided) != len(set(clean_provided)):
raise ValueError(f"Duplicate theta names are not allowed: {clean_provided}")
if len(clean_expected) != len(set(clean_expected)):
raise RuntimeError(
"Quote-insensitive theta names are ambiguous. "
f"Expanded theta names are {expected_theta_names}."
)
if set(clean_provided) != set(clean_expected):
raise ValueError(
f"Provided theta values {clean_provided} do not match "
f"expected theta names {clean_expected}."
)
canonical_name_by_clean_name = {
clean_name: canonical_name
for clean_name, canonical_name in zip(clean_expected, expected_theta_names)
}
canonical_columns = [
canonical_name_by_clean_name[clean_name] for clean_name in clean_provided
]
theta_values = theta_values.copy()
theta_values.columns = canonical_columns
# Put columns in model order.
return theta_values[expected_theta_names]
[docs]
def objective_at_theta(self, theta_values=None, initialize_parmest_model=False):
"""
Objective value for each theta, solving extensive form problem with
fixed theta values.
"""
if self.pest_deprecated is not None:
return self.pest_deprecated.objective_at_theta(
theta_values=theta_values,
initialize_parmest_model=initialize_parmest_model,
)
if initialize_parmest_model:
deprecation_warning(
"The `initialize_parmest_model` option in `objective_at_theta()` is "
"deprecated and will be removed in future releases. Please ensure the "
"model is initialized within the Experiment class definition.",
version="6.10.1",
)
if theta_values is None:
template_model = self._create_parmest_model(0)
theta_names, _ = self._expanded_theta_info(template_model)
theta_names = list(theta_names)
all_thetas = []
else:
theta_values = self._normalize_theta_dataframe(theta_values)
theta_names = list(theta_values.columns)
all_thetas = theta_values.to_dict("records")
num_tasks = len(all_thetas) if all_thetas else 1
task_mgr = utils.ParallelTaskManager(num_tasks)
local_thetas = task_mgr.global_to_local_data(all_thetas) if all_thetas else []
all_obj = list()
if len(all_thetas) > 0:
for Theta in local_thetas:
obj, thetvals, worststatus = self._Q_opt(
theta_vals=Theta, fix_theta=True
)
if worststatus == pyo.TerminationCondition.infeasible or obj is None:
all_obj.append(None)
else:
all_obj.append([Theta[name] for name in theta_names] + [obj])
else:
obj, thetvals, worststatus = self._Q_opt(theta_vals=None, fix_theta=True)
if worststatus == pyo.TerminationCondition.infeasible or obj is None:
all_obj.append(None)
else:
all_obj.append([thetvals[name] for name in theta_names] + [obj])
global_all_obj = task_mgr.allgather_global_data(all_obj)
global_all_obj = [row for row in global_all_obj if row is not None]
dfcols = list(theta_names) + ['obj']
obj_at_theta = pd.DataFrame(data=global_all_obj, columns=dfcols)
return obj_at_theta
[docs]
def likelihood_ratio_test(
self, obj_at_theta, obj_value, alphas, return_thresholds=False
):
r"""
Likelihood ratio test to identify theta values within a confidence
region using the :math:`\chi^2` distribution
Parameters
----------
obj_at_theta: pd.DataFrame, columns = theta_names + 'obj'
Objective values for each theta value (returned by
objective_at_theta)
obj_value: int or float
Objective value from parameter estimation using all data
alphas: list
List of alpha values to use in the chi2 test
return_thresholds: bool, optional
Return the threshold value for each alpha. Default is False.
Returns
-------
LR: pd.DataFrame
Objective values for each theta value along with True or False for
each alpha
thresholds: pd.Series
If return_threshold = True, the thresholds are also returned.
"""
# check if we are using deprecated parmest
if self.pest_deprecated is not None:
return self.pest_deprecated.likelihood_ratio_test(
obj_at_theta, obj_value, alphas, return_thresholds=return_thresholds
)
assert isinstance(obj_at_theta, pd.DataFrame)
assert isinstance(obj_value, (int, float))
assert isinstance(alphas, list)
assert isinstance(return_thresholds, bool)
LR = obj_at_theta.copy()
S = len(self.exp_list)
thresholds = {}
for a in alphas:
chi2_val = scipy.stats.chi2.ppf(a, 2)
thresholds[a] = obj_value * ((chi2_val / (S - 2)) + 1)
LR[a] = LR['obj'] < thresholds[a]
thresholds = pd.Series(thresholds)
if return_thresholds:
return LR, thresholds
else:
return LR
[docs]
def confidence_region_test(
self, theta_values, distribution, alphas, test_theta_values=None, seed=None
):
"""
Confidence region test to determine if theta values are within a
rectangular, multivariate normal, or Gaussian kernel density distribution
for a range of alpha values
Parameters
----------
theta_values: pd.DataFrame, columns = theta_names
Theta values used to generate a confidence region
(generally returned by theta_est_bootstrap)
distribution: string
Statistical distribution used to define a confidence region,
options = 'MVN' for multivariate_normal, 'KDE' for gaussian_kde,
and 'Rect' for rectangular.
alphas: list
List of alpha values used to determine if theta values are inside
or outside the region.
test_theta_values: pd.Series or pd.DataFrame, keys/columns = theta_names, optional
Additional theta values that are compared to the confidence region
to determine if they are inside or outside.
Returns
-------
training_results: pd.DataFrame
Theta value used to generate the confidence region along with True
(inside) or False (outside) for each alpha
test_results: pd.DataFrame
If test_theta_values is not None, returns test theta value along
with True (inside) or False (outside) for each alpha
"""
# check if we are using deprecated parmest
if self.pest_deprecated is not None:
return self.pest_deprecated.confidence_region_test(
theta_values, distribution, alphas, test_theta_values=test_theta_values
)
assert isinstance(theta_values, pd.DataFrame)
assert distribution in ['Rect', 'MVN', 'KDE']
assert isinstance(alphas, list)
assert isinstance(
test_theta_values, (type(None), dict, pd.Series, pd.DataFrame)
)
if isinstance(test_theta_values, (dict, pd.Series)):
test_theta_values = pd.Series(test_theta_values).to_frame().transpose()
training_results = theta_values.copy()
if test_theta_values is not None:
test_result = test_theta_values.copy()
if seed is not None:
np.random.seed(seed)
for a in alphas:
if distribution == 'Rect':
lb, ub = graphics.fit_rect_dist(theta_values, a)
training_results[a] = (theta_values > lb).all(axis=1) & (
theta_values < ub
).all(axis=1)
if test_theta_values is not None:
# use upper and lower bound from the training set
test_result[a] = (test_theta_values > lb).all(axis=1) & (
test_theta_values < ub
).all(axis=1)
elif distribution == 'MVN':
dist = graphics.fit_mvn_dist(theta_values, seed=seed)
Z = dist.pdf(theta_values)
score = scipy.stats.scoreatpercentile(Z, (1 - a) * 100)
training_results[a] = Z >= score
if test_theta_values is not None:
# use score from the training set
Z = dist.pdf(test_theta_values)
test_result[a] = Z >= score
elif distribution == 'KDE':
dist = graphics.fit_kde_dist(theta_values, seed=seed)
Z = dist.pdf(theta_values.transpose())
score = scipy.stats.scoreatpercentile(Z, (1 - a) * 100)
training_results[a] = Z >= score
if test_theta_values is not None:
# use score from the training set
Z = dist.pdf(test_theta_values.transpose())
test_result[a] = Z >= score
if test_theta_values is not None:
return training_results, test_result
else:
return training_results
################################
# deprecated functions/classes #
################################
# Only used in the deprecatedEstimator class after 6.10.1dev0
[docs]
def ef_nonants(ef):
# Wrapper to call someone's ef_nonants
# (the function being called is very short, but it might be changed)
if use_mpisppy:
return sputils.ef_nonants(ef)
else:
return local_ef.ef_nonants(ef)
# Only used in the deprecatedEstimator class
def _experiment_instance_creation_callback(
scenario_name, node_names=None, cb_data=None
):
"""
This is going to be called by mpi-sppy or the local EF and it will call into
the user's model's callback.
Parameters:
-----------
scenario_name: `str` Scenario name should end with a number
node_names: `None` ( Not used here )
cb_data : dict with ["callback"], ["BootList"],
["theta_names"], ["cb_data"], etc.
"cb_data" is passed through to user's callback function
that is the "callback" value.
"BootList" is None or bootstrap experiment number list.
(called cb_data by mpisppy)
Returns:
--------
instance: `ConcreteModel`
instantiated scenario
Note:
----
There is flexibility both in how the function is passed and its signature.
"""
assert cb_data is not None
outer_cb_data = cb_data
scen_num_str = re.compile(r'(\d+)$').search(scenario_name).group(1)
scen_num = int(scen_num_str)
basename = scenario_name[: -len(scen_num_str)] # to reconstruct name
CallbackFunction = outer_cb_data["callback"]
if callable(CallbackFunction):
callback = CallbackFunction
else:
cb_name = CallbackFunction
if "CallbackModule" not in outer_cb_data:
raise RuntimeError(
"Internal Error: need CallbackModule in parmest callback"
)
else:
modname = outer_cb_data["CallbackModule"]
if isinstance(modname, str):
cb_module = im.import_module(modname, package=None)
elif isinstance(modname, types.ModuleType):
cb_module = modname
else:
print("Internal Error: bad CallbackModule")
raise
try:
callback = getattr(cb_module, cb_name)
except:
print("Error getting function=" + cb_name + " from module=" + str(modname))
raise
if "BootList" in outer_cb_data:
bootlist = outer_cb_data["BootList"]
# print("debug in callback: using bootlist=",str(bootlist))
# assuming bootlist itself is zero based
exp_num = bootlist[scen_num]
else:
exp_num = scen_num
scen_name = basename + str(exp_num)
cb_data = outer_cb_data["cb_data"] # cb_data might be None.
# at least three signatures are supported. The first is preferred
try:
instance = callback(experiment_number=exp_num, cb_data=cb_data)
except TypeError:
raise RuntimeError(
"Only one callback signature is supported: "
"callback(experiment_number, cb_data) "
)
"""
try:
instance = callback(scenario_tree_model, scen_name, node_names)
except TypeError: # deprecated signature?
try:
instance = callback(scen_name, node_names)
except:
print("Failed to create instance using callback; TypeError+")
raise
except:
print("Failed to create instance using callback.")
raise
"""
if hasattr(instance, "_mpisppy_node_list"):
raise RuntimeError(f"scenario for experiment {exp_num} has _mpisppy_node_list")
nonant_list = [
instance.find_component(vstr) for vstr in outer_cb_data["theta_names"]
]
if use_mpisppy:
instance._mpisppy_node_list = [
scenario_tree.ScenarioNode(
name="ROOT",
cond_prob=1.0,
stage=1,
cost_expression=instance.FirstStageCost,
nonant_list=nonant_list,
scen_model=instance,
)
]
else:
instance._mpisppy_node_list = [
scenario_tree.ScenarioNode(
name="ROOT",
cond_prob=1.0,
stage=1,
cost_expression=instance.FirstStageCost,
scen_name_list=None,
nonant_list=nonant_list,
scen_model=instance,
)
]
if "ThetaVals" in outer_cb_data:
thetavals = outer_cb_data["ThetaVals"]
# dlw august 2018: see mea code for more general theta
for name, val in thetavals.items():
theta_cuid = ComponentUID(name)
theta_object = theta_cuid.find_component_on(instance)
if val is not None:
# print("Fixing",vstr,"at",str(thetavals[vstr]))
theta_object.fix(val)
else:
# print("Freeing",vstr)
theta_object.unfix()
return instance
[docs]
@deprecated(version='6.7.2')
def group_data(data, groupby_column_name, use_mean=None):
"""
Group data by scenario
Parameters
----------
data: DataFrame
Data
groupby_column_name: strings
Name of data column which contains scenario numbers
use_mean: list of column names or None, optional
Name of data columns which should be reduced to a single value per
scenario by taking the mean
Returns
----------
grouped_data: list of dictionaries
Grouped data
"""
if use_mean is None:
use_mean_list = []
else:
use_mean_list = use_mean
grouped_data = []
for exp_num, group in data.groupby(data[groupby_column_name]):
d = {}
for col in group.columns:
if col in use_mean_list:
d[col] = group[col].mean()
else:
d[col] = list(group[col])
grouped_data.append(d)
return grouped_data
class _DeprecatedSecondStageCostExpr:
"""
Class to pass objective expression into the Pyomo model
"""
def __init__(self, ssc_function, data):
self._ssc_function = ssc_function
self._data = data
def __call__(self, model):
return self._ssc_function(model, self._data)
class _DeprecatedEstimator:
"""
Parameter estimation class
Parameters
----------
model_function: function
Function that generates an instance of the Pyomo model using 'data'
as the input argument
data: pd.DataFrame, list of dictionaries, list of dataframes, or list of json file names
Data that is used to build an instance of the Pyomo model and build
the objective function
theta_names: list of strings
List of Var names to estimate
obj_function: function, optional
Function used to formulate parameter estimation objective, generally
sum of squared error between measurements and model variables.
If no function is specified, the model is used
"as is" and should be defined with a "FirstStageCost" and
"SecondStageCost" expression that are used to build an objective.
tee: bool, optional
Indicates that ef solver output should be teed
diagnostic_mode: bool, optional
If True, print diagnostics from the solver
solver_options: dict, optional
Provides options to the solver (also the name of an attribute)
"""
def __init__(
self,
model_function,
data,
theta_names,
obj_function=None,
tee=False,
diagnostic_mode=False,
solver_options=None,
):
self.model_function = model_function
assert isinstance(
data, (list, pd.DataFrame)
), "Data must be a list or DataFrame"
# convert dataframe into a list of dataframes, each row = one scenario
if isinstance(data, pd.DataFrame):
self.callback_data = [
data.loc[i, :].to_frame().transpose() for i in data.index
]
else:
self.callback_data = data
assert isinstance(
self.callback_data[0], (dict, pd.DataFrame, str)
), "The scenarios in data must be a dictionary, DataFrame or filename"
if len(theta_names) == 0:
self.theta_names = ['parmest_dummy_var']
else:
self.theta_names = theta_names
self.obj_function = obj_function
self.tee = tee
self.diagnostic_mode = diagnostic_mode
self.solver_options = solver_options
self._second_stage_cost_exp = "SecondStageCost"
# boolean to indicate if model is initialized using a square solve
self.model_initialized = False
def _return_theta_names(self):
"""
Return list of fitted model parameter names
"""
# if fitted model parameter names differ from theta_names created when Estimator object is created
if hasattr(self, 'theta_names_updated'):
return self.theta_names_updated
else:
return (
self.theta_names
) # default theta_names, created when Estimator object is created
def _create_parmest_model(self, data):
"""
Modify the Pyomo model for parameter estimation
"""
model = self.model_function(data)
if (len(self.theta_names) == 1) and (
self.theta_names[0] == 'parmest_dummy_var'
):
model.parmest_dummy_var = pyo.Var(initialize=1.0)
# Add objective function (optional)
if self.obj_function:
for obj in model.component_objects(pyo.Objective):
if obj.name in ["Total_Cost_Objective"]:
raise RuntimeError(
"Parmest will not override the existing model Objective named "
+ obj.name
)
obj.deactivate()
for expr in model.component_data_objects(pyo.Expression):
if expr.name in ["FirstStageCost", "SecondStageCost"]:
raise RuntimeError(
"Parmest will not override the existing model Expression named "
+ expr.name
)
model.FirstStageCost = pyo.Expression(expr=0)
model.SecondStageCost = pyo.Expression(
rule=_DeprecatedSecondStageCostExpr(self.obj_function, data)
)
def TotalCost_rule(model):
return model.FirstStageCost + model.SecondStageCost
model.Total_Cost_Objective = pyo.Objective(
rule=TotalCost_rule, sense=pyo.minimize
)
# Convert theta Params to Vars, and unfix theta Vars
model = utils.convert_params_to_vars(model, self.theta_names)
# Update theta names list to use CUID string representation
for i, theta in enumerate(self.theta_names):
var_cuid = ComponentUID(theta)
var_validate = var_cuid.find_component_on(model)
if var_validate is None:
logger.warning(
"theta_name[%s] (%s) was not found on the model", (i, theta)
)
else:
try:
# If the component is not a variable,
# this will generate an exception (and the warning
# in the 'except')
var_validate.unfix()
self.theta_names[i] = repr(var_cuid)
except:
logger.warning(theta + ' is not a variable')
self.parmest_model = model
return model
def _instance_creation_callback(self, experiment_number=None, cb_data=None):
# cb_data is a list of dictionaries, list of dataframes, OR list of json file names
exp_data = cb_data[experiment_number]
if isinstance(exp_data, (dict, pd.DataFrame)):
pass
elif isinstance(exp_data, str):
try:
with open(exp_data, 'r') as infile:
exp_data = json.load(infile)
except:
raise RuntimeError(f'Could not read {exp_data} as json')
else:
raise RuntimeError(f'Unexpected data format for cb_data={cb_data}')
model = self._create_parmest_model(exp_data)
return model
def _Q_opt(
self,
ThetaVals=None,
solver="ef_ipopt",
return_values=[],
bootlist=None,
calc_cov=False,
cov_n=None,
):
"""
Set up all thetas as first stage Vars, return resulting theta
values as well as the objective function value.
"""
if solver == "k_aug":
raise RuntimeError("k_aug no longer supported.")
# (Bootstrap scenarios will use indirection through the bootlist)
if bootlist is None:
scenario_numbers = list(range(len(self.callback_data)))
scen_names = ["Scenario{}".format(i) for i in scenario_numbers]
else:
scen_names = ["Scenario{}".format(i) for i in range(len(bootlist))]
# tree_model.CallbackModule = None
outer_cb_data = dict()
outer_cb_data["callback"] = self._instance_creation_callback
if ThetaVals is not None:
outer_cb_data["ThetaVals"] = ThetaVals
if bootlist is not None:
outer_cb_data["BootList"] = bootlist
outer_cb_data["cb_data"] = self.callback_data # None is OK
outer_cb_data["theta_names"] = self.theta_names
options = {"solver": "ipopt"}
scenario_creator_options = {"cb_data": outer_cb_data}
if use_mpisppy:
ef = sputils.create_EF(
scen_names,
_experiment_instance_creation_callback,
EF_name="_Q_opt",
suppress_warnings=True,
scenario_creator_kwargs=scenario_creator_options,
)
else:
ef = local_ef.create_EF(
scen_names,
_experiment_instance_creation_callback,
EF_name="_Q_opt",
suppress_warnings=True,
scenario_creator_kwargs=scenario_creator_options,
)
self.ef_instance = ef
# Solve the extensive form with ipopt
if solver == "ef_ipopt":
if not calc_cov:
# Do not calculate the reduced hessian
solver = SolverFactory('ipopt')
if self.solver_options is not None:
for key in self.solver_options:
solver.options[key] = self.solver_options[key]
solve_result = solver.solve(self.ef_instance, tee=self.tee)
# The import error will be raised when we attempt to use
# inv_reduced_hessian_barrier below.
#
# elif not asl_available:
# raise ImportError("parmest requires ASL to calculate the "
# "covariance matrix with solver 'ipopt'")
else:
# parmest makes the fitted parameters stage 1 variables
ind_vars = []
for ndname, Var, solval in ef_nonants(ef):
ind_vars.append(Var)
# calculate the reduced hessian
solve_result, inv_red_hes = (
inverse_reduced_hessian.inv_reduced_hessian_barrier(
self.ef_instance,
independent_variables=ind_vars,
solver_options=self.solver_options,
tee=self.tee,
)
)
if self.diagnostic_mode:
print(
' Solver termination condition = ',
str(solve_result.solver.termination_condition),
)
# assume all first stage are thetas...
thetavals = {}
for ndname, Var, solval in ef_nonants(ef):
# process the name
# the scenarios are blocks, so strip the scenario name
vname = Var.name[Var.name.find(".") + 1 :]
thetavals[vname] = solval
objval = pyo.value(ef.EF_Obj)
if calc_cov:
# Calculate the covariance matrix
# Number of data points considered
n = cov_n
# Extract number of fitted parameters
l = len(thetavals)
# Assumption: Objective value is sum of squared errors
sse = objval
'''Calculate covariance assuming experimental observation errors are
independent and follow a Gaussian
distribution with constant variance.
The formula used in parmest was verified against equations (7-5-15) and
(7-5-16) in "Nonlinear Parameter Estimation", Y. Bard, 1974.
This formula is also applicable if the objective is scaled by a constant;
the constant cancels out. (was scaled by 1/n because it computes an
expected value.)
'''
cov = 2 * sse / (n - l) * inv_red_hes
cov = pd.DataFrame(
cov, index=thetavals.keys(), columns=thetavals.keys()
)
thetavals = pd.Series(thetavals)
if len(return_values) > 0:
var_values = []
if len(scen_names) > 1: # multiple scenarios
block_objects = self.ef_instance.component_objects(
Block, descend_into=False
)
else: # single scenario
block_objects = [self.ef_instance]
for exp_i in block_objects:
vals = {}
for var in return_values:
exp_i_var = exp_i.find_component(str(var))
if (
exp_i_var is None
): # we might have a block such as _mpisppy_data
continue
# if value to return is ContinuousSet
if type(exp_i_var) == ContinuousSet:
temp = list(exp_i_var)
else:
temp = [pyo.value(_) for _ in exp_i_var.values()]
if len(temp) == 1:
vals[var] = temp[0]
else:
vals[var] = temp
if len(vals) > 0:
var_values.append(vals)
var_values = pd.DataFrame(var_values)
if calc_cov:
return objval, thetavals, var_values, cov
else:
return objval, thetavals, var_values
if calc_cov:
return objval, thetavals, cov
else:
return objval, thetavals
else:
raise RuntimeError("Unknown solver in Q_Opt=" + solver)
def _Q_at_theta(self, thetavals, initialize_parmest_model=False):
"""
Return the objective function value with fixed theta values.
Parameters
----------
thetavals: dict
A dictionary of theta values.
initialize_parmest_model: boolean
If True: Solve square problem instance, build extensive form of the model for
parameter estimation, and set flag model_initialized to True
Returns
-------
objectiveval: float
The objective function value.
thetavals: dict
A dictionary of all values for theta that were input.
solvertermination: Pyomo TerminationCondition
Tries to return the "worst" solver status across the scenarios.
pyo.TerminationCondition.optimal is the best and
pyo.TerminationCondition.infeasible is the worst.
"""
optimizer = pyo.SolverFactory('ipopt')
if len(thetavals) > 0:
dummy_cb = {
"callback": self._instance_creation_callback,
"ThetaVals": thetavals,
"theta_names": self._return_theta_names(),
"cb_data": self.callback_data,
}
else:
dummy_cb = {
"callback": self._instance_creation_callback,
"theta_names": self._return_theta_names(),
"cb_data": self.callback_data,
}
if self.diagnostic_mode:
if len(thetavals) > 0:
print(' Compute objective at theta = ', str(thetavals))
else:
print(' Compute objective at initial theta')
# start block of code to deal with models with no constraints
# (ipopt will crash or complain on such problems without special care)
instance = _experiment_instance_creation_callback("FOO0", None, dummy_cb)
try: # deal with special problems so Ipopt will not crash
first = next(instance.component_objects(pyo.Constraint, active=True))
active_constraints = True
except:
active_constraints = False
# end block of code to deal with models with no constraints
WorstStatus = pyo.TerminationCondition.optimal
totobj = 0
scenario_numbers = list(range(len(self.callback_data)))
if initialize_parmest_model:
# create dictionary to store pyomo model instances (scenarios)
scen_dict = dict()
for snum in scenario_numbers:
sname = "scenario_NODE" + str(snum)
instance = _experiment_instance_creation_callback(sname, None, dummy_cb)
if initialize_parmest_model:
# list to store fitted parameter names that will be unfixed
# after initialization
theta_init_vals = []
# use appropriate theta_names member
theta_ref = self._return_theta_names()
for i, theta in enumerate(theta_ref):
# Use parser in ComponentUID to locate the component
var_cuid = ComponentUID(theta)
var_validate = var_cuid.find_component_on(instance)
if var_validate is None:
logger.warning(
"theta_name %s was not found on the model", (theta)
)
else:
try:
if len(thetavals) == 0:
var_validate.fix()
else:
var_validate.fix(thetavals[theta])
theta_init_vals.append(var_validate)
except:
logger.warning(
'Unable to fix model parameter value for %s (not a Pyomo model Var)',
(theta),
)
if active_constraints:
if self.diagnostic_mode:
print(' Experiment = ', snum)
print(' First solve with special diagnostics wrapper')
status_obj, solved, iters, time, regu = (
utils.ipopt_solve_with_stats(
instance, optimizer, max_iter=500, max_cpu_time=120
)
)
print(
" status_obj, solved, iters, time, regularization_stat = ",
str(status_obj),
str(solved),
str(iters),
str(time),
str(regu),
)
results = optimizer.solve(instance)
if self.diagnostic_mode:
print(
'standard solve solver termination condition=',
str(results.solver.termination_condition),
)
if (
results.solver.termination_condition
!= pyo.TerminationCondition.optimal
):
# DLW: Aug2018: not distinguishing "middlish" conditions
if WorstStatus != pyo.TerminationCondition.infeasible:
WorstStatus = results.solver.termination_condition
if initialize_parmest_model:
if self.diagnostic_mode:
print(
"Scenario {:d} infeasible with initialized parameter values".format(
snum
)
)
else:
if initialize_parmest_model:
if self.diagnostic_mode:
print(
"Scenario {:d} initialization successful with initial parameter values".format(
snum
)
)
if initialize_parmest_model:
# unfix parameters after initialization
for theta in theta_init_vals:
theta.unfix()
scen_dict[sname] = instance
else:
if initialize_parmest_model:
# unfix parameters after initialization
for theta in theta_init_vals:
theta.unfix()
scen_dict[sname] = instance
objobject = getattr(instance, self._second_stage_cost_exp)
objval = pyo.value(objobject)
totobj += objval
retval = totobj / len(scenario_numbers) # -1??
if initialize_parmest_model and not hasattr(self, 'ef_instance'):
# create extensive form of the model using scenario dictionary
if len(scen_dict) > 0:
for scen in scen_dict.values():
scen._mpisppy_probability = 1 / len(scen_dict)
if use_mpisppy:
EF_instance = sputils._create_EF_from_scen_dict(
scen_dict,
EF_name="_Q_at_theta",
# suppress_warnings=True
)
else:
EF_instance = local_ef._create_EF_from_scen_dict(
scen_dict, EF_name="_Q_at_theta", nonant_for_fixed_vars=True
)
self.ef_instance = EF_instance
# set self.model_initialized flag to True to skip extensive form model
# creation using theta_est()
self.model_initialized = True
# return initialized theta values
if len(thetavals) == 0:
# use appropriate theta_names member
theta_ref = self._return_theta_names()
for i, theta in enumerate(theta_ref):
thetavals[theta] = theta_init_vals[i]()
return retval, thetavals, WorstStatus
def _get_sample_list(self, samplesize, num_samples, replacement=True):
samplelist = list()
scenario_numbers = list(range(len(self.callback_data)))
if num_samples is None:
# This could get very large
for i, l in enumerate(combinations(scenario_numbers, samplesize)):
samplelist.append((i, np.sort(l)))
else:
for i in range(num_samples):
attempts = 0
unique_samples = 0 # check for duplicates in each sample
duplicate = False # check for duplicates between samples
while (unique_samples <= len(self._return_theta_names())) and (
not duplicate
):
sample = np.random.choice(
scenario_numbers, samplesize, replace=replacement
)
sample = np.sort(sample).tolist()
unique_samples = len(np.unique(sample))
if sample in samplelist:
duplicate = True
attempts += 1
if attempts > num_samples: # arbitrary timeout limit
raise RuntimeError(
"Internal error: timeout constructing "
"a sample, the dim of theta may be too "
"close to the samplesize"
)
samplelist.append((i, sample))
return samplelist
def theta_est(
self, solver="ef_ipopt", return_values=[], calc_cov=False, cov_n=None
):
"""
Parameter estimation using all scenarios in the data
Parameters
----------
solver: string, optional
Currently only "ef_ipopt" is supported. Default is "ef_ipopt".
return_values: list, optional
List of Variable names, used to return values from the model for data reconciliation
calc_cov: boolean, optional
If True, calculate and return the covariance matrix (only for "ef_ipopt" solver)
cov_n: int, optional
If calc_cov=True, then the user needs to supply the number of datapoints
that are used in the objective function
Returns
-------
objectiveval: float
The objective function value
thetavals: pd.Series
Estimated values for theta
variable values: pd.DataFrame
Variable values for each variable name in return_values (only for solver='ef_ipopt')
cov: pd.DataFrame
Covariance matrix of the fitted parameters (only for solver='ef_ipopt')
"""
assert isinstance(solver, str)
assert isinstance(return_values, list)
assert (calc_cov is NOTSET) or isinstance(calc_cov, bool)
if calc_cov:
assert isinstance(
cov_n, int
), "The number of datapoints that are used in the objective function is required to calculate the covariance matrix"
assert cov_n > len(
self._return_theta_names()
), "The number of datapoints must be greater than the number of parameters to estimate"
return self._Q_opt(
solver=solver,
return_values=return_values,
bootlist=None,
calc_cov=calc_cov,
cov_n=cov_n,
)
def theta_est_bootstrap(
self,
bootstrap_samples,
samplesize=None,
replacement=True,
seed=None,
return_samples=False,
):
"""
Parameter estimation using bootstrap resampling of the data
Parameters
----------
bootstrap_samples: int
Number of bootstrap samples to draw from the data
samplesize: int or None, optional
Size of each bootstrap sample. If samplesize=None, samplesize will be
set to the number of samples in the data
replacement: bool, optional
Sample with or without replacement
seed: int or None, optional
Random seed
return_samples: bool, optional
Return a list of sample numbers used in each bootstrap estimation
Returns
-------
bootstrap_theta: pd.DataFrame
Theta values for each sample and (if return_samples = True)
the sample numbers used in each estimation
"""
assert isinstance(bootstrap_samples, int)
assert isinstance(samplesize, (type(None), int))
assert isinstance(replacement, bool)
assert isinstance(seed, (type(None), int))
assert isinstance(return_samples, bool)
if samplesize is None:
samplesize = len(self.callback_data)
if seed is not None:
np.random.seed(seed)
global_list = self._get_sample_list(samplesize, bootstrap_samples, replacement)
task_mgr = utils.ParallelTaskManager(bootstrap_samples)
local_list = task_mgr.global_to_local_data(global_list)
bootstrap_theta = list()
for idx, sample in local_list:
objval, thetavals = self._Q_opt(bootlist=list(sample))
thetavals['samples'] = sample
bootstrap_theta.append(thetavals)
global_bootstrap_theta = task_mgr.allgather_global_data(bootstrap_theta)
bootstrap_theta = pd.DataFrame(global_bootstrap_theta)
if not return_samples:
del bootstrap_theta['samples']
return bootstrap_theta
def theta_est_leaveNout(
self, lNo, lNo_samples=None, seed=None, return_samples=False
):
"""
Parameter estimation where N data points are left out of each sample
Parameters
----------
lNo: int
Number of data points to leave out for parameter estimation
lNo_samples: int
Number of leave-N-out samples. If lNo_samples=None, the maximum
number of combinations will be used
seed: int or None, optional
Random seed
return_samples: bool, optional
Return a list of sample numbers that were left out
Returns
-------
lNo_theta: pd.DataFrame
Theta values for each sample and (if return_samples = True)
the sample numbers left out of each estimation
"""
assert isinstance(lNo, int)
assert isinstance(lNo_samples, (type(None), int))
assert isinstance(seed, (type(None), int))
assert isinstance(return_samples, bool)
samplesize = len(self.callback_data) - lNo
if seed is not None:
np.random.seed(seed)
global_list = self._get_sample_list(samplesize, lNo_samples, replacement=False)
task_mgr = utils.ParallelTaskManager(len(global_list))
local_list = task_mgr.global_to_local_data(global_list)
lNo_theta = list()
for idx, sample in local_list:
objval, thetavals = self._Q_opt(bootlist=list(sample))
lNo_s = list(set(range(len(self.callback_data))) - set(sample))
thetavals['lNo'] = np.sort(lNo_s)
lNo_theta.append(thetavals)
global_bootstrap_theta = task_mgr.allgather_global_data(lNo_theta)
lNo_theta = pd.DataFrame(global_bootstrap_theta)
if not return_samples:
del lNo_theta['lNo']
return lNo_theta
def leaveNout_bootstrap_test(
self, lNo, lNo_samples, bootstrap_samples, distribution, alphas, seed=None
):
"""
Leave-N-out bootstrap test to compare theta values where N data points are
left out to a bootstrap analysis using the remaining data,
results indicate if theta is within a confidence region
determined by the bootstrap analysis
Parameters
----------
lNo: int
Number of data points to leave out for parameter estimation
lNo_samples: int
Leave-N-out sample size. If lNo_samples=None, the maximum number
of combinations will be used
bootstrap_samples: int:
Bootstrap sample size
distribution: string
Statistical distribution used to define a confidence region,
options = 'MVN' for multivariate_normal, 'KDE' for gaussian_kde,
and 'Rect' for rectangular.
alphas: list
List of alpha values used to determine if theta values are inside
or outside the region.
seed: int or None, optional
Random seed
Returns
----------
List of tuples with one entry per lNo_sample:
* The first item in each tuple is the list of N samples that are left
out.
* The second item in each tuple is a DataFrame of theta estimated using
the N samples.
* The third item in each tuple is a DataFrame containing results from
the bootstrap analysis using the remaining samples.
For each DataFrame a column is added for each value of alpha which
indicates if the theta estimate is in (True) or out (False) of the
alpha region for a given distribution (based on the bootstrap results)
"""
assert isinstance(lNo, int)
assert isinstance(lNo_samples, (type(None), int))
assert isinstance(bootstrap_samples, int)
assert distribution in ['Rect', 'MVN', 'KDE']
assert isinstance(alphas, list)
assert isinstance(seed, (type(None), int))
if seed is not None:
np.random.seed(seed)
data = self.callback_data.copy()
global_list = self._get_sample_list(lNo, lNo_samples, replacement=False)
results = []
for idx, sample in global_list:
# Reset callback_data to only include the sample
self.callback_data = [data[i] for i in sample]
obj, theta = self.theta_est()
# Reset callback_data to include all scenarios except the sample
self.callback_data = [data[i] for i in range(len(data)) if i not in sample]
bootstrap_theta = self.theta_est_bootstrap(bootstrap_samples)
training, test = self.confidence_region_test(
bootstrap_theta,
distribution=distribution,
alphas=alphas,
test_theta_values=theta,
)
results.append((sample, test, training))
# Reset callback_data (back to full data set)
self.callback_data = data
return results
def objective_at_theta(self, theta_values=None, initialize_parmest_model=False):
"""
Objective value for each theta
Parameters
----------
theta_values: pd.DataFrame, columns=theta_names
Values of theta used to compute the objective
initialize_parmest_model: boolean
If True: Solve square problem instance, build extensive form of the model for
parameter estimation, and set flag model_initialized to True
Returns
-------
obj_at_theta: pd.DataFrame
Objective value for each theta (infeasible solutions are
omitted).
"""
if len(self.theta_names) == 1 and self.theta_names[0] == 'parmest_dummy_var':
pass # skip assertion if model has no fitted parameters
else:
# create a local instance of the pyomo model to access model variables and parameters
model_temp = self._create_parmest_model(self.callback_data[0])
model_theta_list = [] # list to store indexed and non-indexed parameters
# iterate over original theta_names
for theta_i in self.theta_names:
var_cuid = ComponentUID(theta_i)
var_validate = var_cuid.find_component_on(model_temp)
# check if theta in theta_names are indexed
try:
# get component UID of Set over which theta is defined
set_cuid = ComponentUID(var_validate.index_set())
# access and iterate over the Set to generate theta names as they appear
# in the pyomo model
set_validate = set_cuid.find_component_on(model_temp)
for s in set_validate:
self_theta_temp = repr(var_cuid) + "[" + repr(s) + "]"
# generate list of theta names
model_theta_list.append(self_theta_temp)
# if theta is not indexed, copy theta name to list as-is
except AttributeError:
self_theta_temp = repr(var_cuid)
model_theta_list.append(self_theta_temp)
except:
raise
# if self.theta_names is not the same as temp model_theta_list,
# create self.theta_names_updated
if set(self.theta_names) == set(model_theta_list) and len(
self.theta_names
) == set(model_theta_list):
pass
else:
self.theta_names_updated = model_theta_list
if theta_values is None:
all_thetas = {} # dictionary to store fitted variables
# use appropriate theta names member
theta_names = self._return_theta_names()
else:
assert isinstance(theta_values, pd.DataFrame)
# for parallel code we need to use lists and dicts in the loop
theta_names = theta_values.columns
# # check if theta_names are in model
for theta in list(theta_names):
theta_temp = theta.replace("'", "") # cleaning quotes from theta_names
assert theta_temp in [
t.replace("'", "") for t in model_theta_list
], "Theta name {} in 'theta_values' not in 'theta_names' {}".format(
theta_temp, model_theta_list
)
assert len(list(theta_names)) == len(model_theta_list)
all_thetas = theta_values.to_dict('records')
if all_thetas:
task_mgr = utils.ParallelTaskManager(len(all_thetas))
local_thetas = task_mgr.global_to_local_data(all_thetas)
else:
if initialize_parmest_model:
task_mgr = utils.ParallelTaskManager(
1
) # initialization performed using just 1 set of theta values
# walk over the mesh, return objective function
all_obj = list()
if len(all_thetas) > 0:
for Theta in local_thetas:
obj, thetvals, worststatus = self._Q_at_theta(
Theta, initialize_parmest_model=initialize_parmest_model
)
if worststatus != pyo.TerminationCondition.infeasible:
all_obj.append(list(Theta.values()) + [obj])
# DLW, Aug2018: should we also store the worst solver status?
else:
obj, thetvals, worststatus = self._Q_at_theta(
thetavals={}, initialize_parmest_model=initialize_parmest_model
)
if worststatus != pyo.TerminationCondition.infeasible:
all_obj.append(list(thetvals.values()) + [obj])
global_all_obj = task_mgr.allgather_global_data(all_obj)
dfcols = list(theta_names) + ['obj']
obj_at_theta = pd.DataFrame(data=global_all_obj, columns=dfcols)
return obj_at_theta
def likelihood_ratio_test(
self, obj_at_theta, obj_value, alphas, return_thresholds=False
):
r"""
Likelihood ratio test to identify theta values within a confidence
region using the :math:`\chi^2` distribution
Parameters
----------
obj_at_theta: pd.DataFrame, columns = theta_names + 'obj'
Objective values for each theta value (returned by
objective_at_theta)
obj_value: int or float
Objective value from parameter estimation using all data
alphas: list
List of alpha values to use in the chi2 test
return_thresholds: bool, optional
Return the threshold value for each alpha
Returns
-------
LR: pd.DataFrame
Objective values for each theta value along with True or False for
each alpha
thresholds: pd.Series
If return_threshold = True, the thresholds are also returned.
"""
assert isinstance(obj_at_theta, pd.DataFrame)
assert isinstance(obj_value, (int, float))
assert isinstance(alphas, list)
assert isinstance(return_thresholds, bool)
LR = obj_at_theta.copy()
S = len(self.callback_data)
thresholds = {}
for a in alphas:
chi2_val = scipy.stats.chi2.ppf(a, 2)
thresholds[a] = obj_value * ((chi2_val / (S - 2)) + 1)
LR[a] = LR['obj'] < thresholds[a]
thresholds = pd.Series(thresholds)
if return_thresholds:
return LR, thresholds
else:
return LR
def confidence_region_test(
self, theta_values, distribution, alphas, test_theta_values=None
):
"""
Confidence region test to determine if theta values are within a
rectangular, multivariate normal, or Gaussian kernel density distribution
for a range of alpha values
Parameters
----------
theta_values: pd.DataFrame, columns = theta_names
Theta values used to generate a confidence region
(generally returned by theta_est_bootstrap)
distribution: string
Statistical distribution used to define a confidence region,
options = 'MVN' for multivariate_normal, 'KDE' for gaussian_kde,
and 'Rect' for rectangular.
alphas: list
List of alpha values used to determine if theta values are inside
or outside the region.
test_theta_values: pd.Series or pd.DataFrame, keys/columns = theta_names, optional
Additional theta values that are compared to the confidence region
to determine if they are inside or outside.
Returns
training_results: pd.DataFrame
Theta value used to generate the confidence region along with True
(inside) or False (outside) for each alpha
test_results: pd.DataFrame
If test_theta_values is not None, returns test theta value along
with True (inside) or False (outside) for each alpha
"""
assert isinstance(theta_values, pd.DataFrame)
assert distribution in ['Rect', 'MVN', 'KDE']
assert isinstance(alphas, list)
assert isinstance(
test_theta_values, (type(None), dict, pd.Series, pd.DataFrame)
)
if isinstance(test_theta_values, (dict, pd.Series)):
test_theta_values = pd.Series(test_theta_values).to_frame().transpose()
training_results = theta_values.copy()
if test_theta_values is not None:
test_result = test_theta_values.copy()
for a in alphas:
if distribution == 'Rect':
lb, ub = graphics.fit_rect_dist(theta_values, a)
training_results[a] = (theta_values > lb).all(axis=1) & (
theta_values < ub
).all(axis=1)
if test_theta_values is not None:
# use upper and lower bound from the training set
test_result[a] = (test_theta_values > lb).all(axis=1) & (
test_theta_values < ub
).all(axis=1)
elif distribution == 'MVN':
dist = graphics.fit_mvn_dist(theta_values)
Z = dist.pdf(theta_values)
score = scipy.stats.scoreatpercentile(Z, (1 - a) * 100)
training_results[a] = Z >= score
if test_theta_values is not None:
# use score from the training set
Z = dist.pdf(test_theta_values)
test_result[a] = Z >= score
elif distribution == 'KDE':
dist = graphics.fit_kde_dist(theta_values)
Z = dist.pdf(theta_values.transpose())
score = scipy.stats.scoreatpercentile(Z, (1 - a) * 100)
training_results[a] = Z >= score
if test_theta_values is not None:
# use score from the training set
Z = dist.pdf(test_theta_values.transpose())
test_result[a] = Z >= score
if test_theta_values is not None:
return training_results, test_result
else:
return training_results