# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
#
# Pyomo.DoE was produced under the Department of Energy Carbon Capture Simulation
# Initiative (CCSI), and is copyright (c) 2022 by the software owners:
# TRIAD National Security, LLC., Lawrence Livermore National Security, LLC.,
# Lawrence Berkeley National Laboratory, Pacific Northwest National Laboratory,
# Battelle Memorial Institute, University of Notre Dame,
# The University of Pittsburgh, The University of Texas at Austin,
# University of Toledo, West Virginia University, et al. All rights reserved.
#
# NOTICE. This Software was developed under funding from the
# U.S. Department of Energy and the U.S. Government consequently retains
# certain rights. As such, the U.S. Government has been granted for itself
# and others acting on its behalf a paid-up, nonexclusive, irrevocable,
# worldwide license in the Software to reproduce, distribute copies to the
# public, prepare derivative works, and perform publicly and display
# publicly, and to permit other to do so.
# ___________________________________________________________________________
from enum import Enum
from itertools import permutations, product
import json
import logging
import math
from pathlib import Path
from pyomo.common.dependencies import (
numpy as np,
numpy_available,
pandas as pd,
matplotlib as plt,
)
from pyomo.common.modeling import unique_component_name
from pyomo.common.timing import TicTocTimer
from pyomo.contrib.sensitivity_toolbox.sens import get_dsdp
import pyomo.environ as pyo
from pyomo.opt import SolverStatus
[docs]
class ObjectiveLib(Enum):
determinant = "determinant"
trace = "trace"
minimum_eigenvalue = "minimum_eigenvalue"
zero = "zero"
[docs]
class FiniteDifferenceStep(Enum):
forward = "forward"
central = "central"
backward = "backward"
[docs]
class DesignOfExperiments:
[docs]
def __init__(
self,
experiment=None,
fd_formula="central",
step=1e-3,
objective_option="determinant",
scale_constant_value=1.0,
scale_nominal_param_value=False,
prior_FIM=None,
jac_initial=None,
fim_initial=None,
L_diagonal_lower_bound=1e-7,
solver=None,
tee=False,
get_labeled_model_args=None,
logger_level=logging.WARNING,
_Cholesky_option=True,
_only_compute_fim_lower=True,
):
"""
This package enables model-based design of experiments analysis with Pyomo.
Both direct optimization and enumeration modes are supported.
The package has been refactored from its original form as of August 24. See
the documentation for more information.
Parameters
----------
experiment:
Experiment object that holds the model and labels all the components. The object
should have a ``get_labeled_model`` where a model is returned with the following
labeled sets: ``unknown_parameters``, ``experimental_inputs``, ``experimental_outputs``
fd_formula:
Finite difference formula for computing the sensitivity matrix. Must be one of
[``central``, ``forward``, ``backward``], default: ``central``
step:
Relative step size for the finite difference formula.
default: 1e-3
objective_option:
String representation of the objective option. Current available options are:
``determinant`` (for determinant, or D-optimality) and ``trace`` (for trace or
A-optimality)
scale_constant_value:
Constant scaling for the sensitivity matrix. Every element will be multiplied by this
scaling factor.
default: 1
scale_nominal_param_value:
Boolean for whether or not to scale the sensitivity matrix by the nominal parameter
values. Every column of the sensitivity matrix will be divided by the respective
nominal parameter value.
default: False
prior_FIM:
2D numpy array representing information from prior experiments. If no value is given,
the assumed prior will be a matrix of zeros. This matrix will be assumed to be scaled
as the user has specified (i.e., if scale_nominal_param_value is true, we will assume
the FIM provided here has been scaled by the parameter values)
jac_initial:
2D numpy array as the initial values for the sensitivity matrix.
fim_initial:
2D numpy array as the initial values for the FIM.
L_diagonal_lower_bound:
Lower bound for the values of the lower triangular Cholesky factorization matrix.
default: 1e-7
solver:
A ``solver`` object specified by the user, default=None.
If not specified, default solver is set to IPOPT with MA57.
tee:
Solver option to be passed for verbose output.
get_labeled_model_args:
Additional arguments for the ``get_labeled_model`` function on the Experiment object.
_Cholesky_option:
Boolean value of whether or not to use the cholesky factorization to compute the
determinant for the D-optimality criteria. This parameter should not be changed
unless the user intends to make performance worse (i.e., compare an existing tool
that uses the full FIM to this algorithm)
_only_compute_fim_lower:
If True, only the lower triangle of the FIM is computed. This parameter should not
be changed unless the user intends to make performance worse (i.e., compare an
existing tool that uses the full FIM to this algorithm)
logger_level:
Specify the level of the logger. Change to logging.DEBUG for all messages.
"""
if experiment is None:
raise ValueError("Experiment object must be provided to perform DoE.")
# Check if the Experiment object has callable ``get_labeled_model`` function
if not hasattr(experiment, "get_labeled_model"):
raise ValueError(
"The experiment object must have a ``get_labeled_model`` function"
)
# Set the experiment object from the user
self.experiment = experiment
# Set the finite difference and subsequent step size
self.fd_formula = FiniteDifferenceStep(fd_formula)
self.step = step
# Set the objective type and scaling options:
self.objective_option = ObjectiveLib(objective_option)
self.scale_constant_value = scale_constant_value
self.scale_nominal_param_value = scale_nominal_param_value
# Set the prior FIM (will be checked upon model construction)
self.prior_FIM = prior_FIM
# Set the initial values for the jacobian, fim, and L matrices
self.jac_initial = jac_initial
self.fim_initial = fim_initial
# Set the lower bound on the Cholesky lower triangular matrix
self.L_diagonal_lower_bound = L_diagonal_lower_bound
# check if user-defined solver is given
if solver:
self.solver = solver
# if not given, use default solver
else:
solver = pyo.SolverFactory("ipopt")
solver.options["linear_solver"] = "ma57"
solver.options["halt_on_ampl_error"] = "yes"
solver.options["max_iter"] = 3000
self.solver = solver
self.tee = tee
# Set get_labeled_model_args as an empty dict if no arguments are passed
if get_labeled_model_args is None:
get_labeled_model_args = {}
self.get_labeled_model_args = get_labeled_model_args
# Revtrieve logger and set logging level
self.logger = logging.getLogger(__name__)
self.logger.setLevel(level=logger_level)
# Set the private options if passed (only developers should pass these)
self.Cholesky_option = _Cholesky_option
self.only_compute_fim_lower = _only_compute_fim_lower
# model attribute to avoid rebuilding models
self.model = pyo.ConcreteModel() # Build empty model
# Empty results object
self.results = {}
# May need this attribute for more complicated structures?
# (i.e., no model rebuilding for large models with sequential)
self._built_scenarios = False
# Perform doe
[docs]
def run_doe(self, model=None, results_file=None):
"""
Runs DoE for a single experiment estimation. Can save results in
a file based on the flag.
Parameters
----------
model: model to run the DoE, default: None (self.model)
results_file: string name of the file path to save the results
to in the form of a .json file
default: None --> don't save
"""
# Check results file name
if results_file is not None:
if type(results_file) not in [Path, str]:
raise ValueError(
"``results_file`` must be either a Path object or a string."
)
# Start timer
sp_timer = TicTocTimer()
sp_timer.tic(msg=None)
self.logger.info("Beginning experimental optimization.")
# Model is none, set it to self.model
if model is None:
model = self.model
else:
# TODO: Add safe naming when a model is passed by the user.
# doe_block = pyo.Block()
# doe_block_name = unique_component_name(model, "design_of_experiments_block")
# model.add_component(doe_block_name, doe_block)
pass
# ToDo: potentially work with this for more complicated models
# Create the full DoE model (build scenarios for F.D. scheme)
if not self._built_scenarios:
self.create_doe_model(model=model)
# Add the objective function to the model
self.create_objective_function(model=model)
# Track time required to build the DoE model
build_time = sp_timer.toc(msg=None)
self.logger.info(
"Successfully built the DoE model.\nBuild time: %0.1f seconds" % build_time
)
# Solve the square problem first to initialize the fim and
# sensitivity constraints
# Deactivate objective expression and objective constraints (on a block), and fix design variables
model.objective.deactivate()
model.obj_cons.deactivate()
for comp in model.scenario_blocks[0].experiment_inputs:
comp.fix()
# TODO: safeguard solver call to see if solver terminated successfully
# see below commented code:
# res = self.solver.solve(model, tee=self.tee, load_solutions=False)
# if pyo.check_optimal_termination(res):
# model.load_solution(res)
# else:
# # The solver was unsuccessful, might want to warn the user or terminate gracefully, etc.
model.dummy_obj = pyo.Objective(expr=0, sense=pyo.minimize)
self.solver.solve(model, tee=self.tee)
# Track time to initialize the DoE model
initialization_time = sp_timer.toc(msg=None)
self.logger.info(
"Successfully initialized the DoE model.\nInitialization time: %0.1f seconds"
% initialization_time
)
model.dummy_obj.deactivate()
# Reactivate objective and unfix experimental design decisions
for comp in model.scenario_blocks[0].experiment_inputs:
comp.unfix()
model.objective.activate()
model.obj_cons.activate()
# If the model has L, initialize it with the solved FIM
if hasattr(model, "L"):
# Get the FIM values
fim_vals = [
pyo.value(model.fim[i, j])
for i in model.parameter_names
for j in model.parameter_names
]
fim_np = np.array(fim_vals).reshape(
(len(model.parameter_names), len(model.parameter_names))
)
L_vals_sq = np.linalg.cholesky(fim_np)
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
model.L[c, d].value = L_vals_sq[i, j]
if hasattr(model, "determinant"):
model.determinant.value = np.linalg.det(np.array(self.get_FIM()))
# Solve the full model, which has now been initialized with the square solve
res = self.solver.solve(model, tee=self.tee)
# Track time used to solve the DoE model
solve_time = sp_timer.toc(msg=None)
self.logger.info(
"Successfully optimized experiment.\nSolve time: %0.1f seconds" % solve_time
)
self.logger.info(
"Total time for build, initialization, and solve: %0.1f seconds"
% (build_time + initialization_time + solve_time)
)
#
fim_local = self.get_FIM()
# Make sure stale results don't follow the DoE object instance
self.results = {}
self.results["Solver Status"] = res.solver.status
self.results["Termination Condition"] = res.solver.termination_condition
# Important quantities for optimal design
self.results["FIM"] = fim_local
self.results["Sensitivity Matrix"] = self.get_sensitivity_matrix()
self.results["Experiment Design"] = self.get_experiment_input_values()
self.results["Experiment Design Names"] = [
str(pyo.ComponentUID(k, context=model.scenario_blocks[0]))
for k in model.scenario_blocks[0].experiment_inputs
]
self.results["Experiment Outputs"] = self.get_experiment_output_values()
self.results["Experiment Output Names"] = [
str(pyo.ComponentUID(k, context=model.scenario_blocks[0]))
for k in model.scenario_blocks[0].experiment_outputs
]
self.results["Unknown Parameters"] = self.get_unknown_parameter_values()
self.results["Unknown Parameter Names"] = [
str(pyo.ComponentUID(k, context=model.scenario_blocks[0]))
for k in model.scenario_blocks[0].unknown_parameters
]
self.results["Measurement Error"] = self.get_measurement_error_values()
self.results["Measurement Error Names"] = [
str(pyo.ComponentUID(k, context=model.scenario_blocks[0]))
for k in model.scenario_blocks[0].measurement_error
]
self.results["Prior FIM"] = [list(row) for row in list(self.prior_FIM)]
# Saving some stats on the FIM for convenience
self.results["Objective expression"] = str(self.objective_option).split(".")[-1]
self.results["log10 A-opt"] = np.log10(np.trace(fim_local))
self.results["log10 D-opt"] = np.log10(np.linalg.det(fim_local))
self.results["log10 E-opt"] = np.log10(min(np.linalg.eig(fim_local)[0]))
self.results["FIM Condition Number"] = np.linalg.cond(fim_local)
# Solve timing stats
self.results["Build Time"] = build_time
self.results["Initialization Time"] = initialization_time
self.results["Solve Time"] = solve_time
self.results["Wall-clock Time"] = build_time + initialization_time + solve_time
# Settings used to generate the optimal DoE
self.results["Finite Difference Scheme"] = str(self.fd_formula).split(".")[-1]
self.results["Finite Difference Step"] = self.step
self.results["Nominal Parameter Scaling"] = self.scale_nominal_param_value
# ToDo: Add more useful fields to the results object?
# ToDo: Add MetaData from the user to the results object? Or leave to the user?
# If the user specifies to save the file, do it here as a json
if results_file is not None:
with open(results_file, "w") as file:
json.dump(self.results, file)
# Perform multi-experiment doe (sequential, or ``greedy`` approach)
def run_multi_doe_sequential(self, N_exp=1):
raise NotImplementedError("Multiple experiment optimization not yet supported.")
# Perform multi-experiment doe (simultaneous, optimal approach)
def run_multi_doe_simultaneous(self, N_exp=1):
raise NotImplementedError("Multiple experiment optimization not yet supported.")
# Compute FIM for the DoE object
[docs]
def compute_FIM(self, model=None, method="sequential"):
"""
Computes the FIM for the experimental design that is
initialized from the experiment`s ``get_labeled_model()``
function.
Parameters
----------
model: model to compute FIM, default: None, (self.compute_FIM_model)
method: string to specify which method should be used
options are ``kaug`` and ``sequential``
Returns
-------
computed FIM: 2D numpy array of the FIM
"""
if model is None:
self.compute_FIM_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
model = self.compute_FIM_model
else:
# TODO: Add safe naming when a model is passed by the user.
# doe_block = pyo.Block()
# doe_block_name = unique_component_name(model, "design_of_experiments_block")
# model.add_component(doe_block_name, doe_block)
# self.compute_FIM_model = model
pass
self.check_model_labels(model=model)
# Set length values for the model features
self.n_parameters = len(model.unknown_parameters)
self.n_measurement_error = len(model.measurement_error)
self.n_experiment_inputs = len(model.experiment_inputs)
self.n_experiment_outputs = len(model.experiment_outputs)
# Check FIM input, if it exists. Otherwise, set the prior_FIM attribute
if self.prior_FIM is None:
self.prior_FIM = np.zeros(
(len(model.unknown_parameters), len(model.unknown_parameters))
)
else:
self.check_model_FIM(FIM=self.prior_FIM)
# TODO: Add a check to see if the model has an objective and deactivate it.
# This solve should only be a square solve without any obj function.
if method == "sequential":
self._sequential_FIM(model=model)
self._computed_FIM = self.seq_FIM
elif method == "kaug":
self._kaug_FIM(model=model)
self._computed_FIM = self.kaug_FIM
else:
raise ValueError(
"The method provided, {}, must be either `sequential` or `kaug`".format(
method
)
)
return self._computed_FIM
# Use a sequential method to get the FIM
def _sequential_FIM(self, model=None):
"""
Used to compute the FIM using a sequential approach,
solving the model consecutively under each of the
finite difference scenarios to build the sensitivity
matrix to subsequently compute the FIM.
"""
# Build a single model instance
if model is None:
self.compute_FIM_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
model = self.compute_FIM_model
# Create suffix to keep track of parameter scenarios
if hasattr(model, "parameter_scenarios"):
model.del_component(model.parameter_scenarios)
model.parameter_scenarios = pyo.Suffix(direction=pyo.Suffix.LOCAL)
# Populate parameter scenarios, and scenario inds based on finite difference scheme
if self.fd_formula == FiniteDifferenceStep.central:
model.parameter_scenarios.update(
(2 * ind, k) for ind, k in enumerate(model.unknown_parameters.keys())
)
model.parameter_scenarios.update(
(2 * ind + 1, k)
for ind, k in enumerate(model.unknown_parameters.keys())
)
model.scenarios = range(len(model.unknown_parameters) * 2)
elif self.fd_formula in [
FiniteDifferenceStep.forward,
FiniteDifferenceStep.backward,
]:
model.parameter_scenarios.update(
(ind + 1, k) for ind, k in enumerate(model.unknown_parameters.keys())
)
model.scenarios = range(len(model.unknown_parameters) + 1)
else:
raise AttributeError(
"Finite difference option not recognized. Please contact the developers as you should not see this error."
)
# Fix design variables
for comp in model.experiment_inputs:
comp.fix()
measurement_vals = []
# In a loop.....
# Calculate measurement values for each scenario
for s in model.scenarios:
# Perturbation to be (1 + diff) * param_value
if self.fd_formula == FiniteDifferenceStep.central:
diff = self.step * (
(-1) ** s
) # Positive perturbation, even; negative, odd
elif self.fd_formula == FiniteDifferenceStep.backward:
diff = (
self.step * -1 * (s != 0)
) # Backward always negative perturbation; 0 at s = 0
elif self.fd_formula == FiniteDifferenceStep.forward:
diff = self.step * (s != 0) # Forward always positive; 0 at s = 0
# If we are doing forward/backward, no change for s=0
skip_param_update = (
self.fd_formula
in [FiniteDifferenceStep.forward, FiniteDifferenceStep.backward]
) and (s == 0)
if not skip_param_update:
param = model.parameter_scenarios[s]
# Update parameter values for the given finite difference scenario
param.set_value(model.unknown_parameters[param] * (1 + diff))
else:
continue
# Simulate the model
try:
res = self.solver.solve(model)
pyo.assert_optimal_termination(res)
except:
# TODO: Make error message more verbose, i.e., add unknown parameter values so the
# user can try to solve the model instance outside of the pyomo.DoE framework.
raise RuntimeError(
"Model from experiment did not solve appropriately. Make sure the model is well-posed."
)
# Extract the measurement values for the scenario and append
measurement_vals.append(
[pyo.value(k) for k, v in model.experiment_outputs.items()]
)
# Use the measurement outputs to make the Q matrix
measurement_vals_np = np.array(measurement_vals).T
self.seq_jac = np.zeros(
(
len(model.experiment_outputs.items()),
len(model.unknown_parameters.items()),
)
)
# Counting variable for loop
i = 0
# Loop over parameter values and grab correct columns for finite difference calculation
for k, v in model.unknown_parameters.items():
curr_step = v * self.step
if self.fd_formula == FiniteDifferenceStep.central:
col_1 = 2 * i
col_2 = 2 * i + 1
curr_step *= 2
elif self.fd_formula == FiniteDifferenceStep.forward:
col_1 = i
col_2 = 0
elif self.fd_formula == FiniteDifferenceStep.backward:
col_1 = 0
col_2 = i
# If scale_nominal_param_value is active, scale by nominal parameter value (v)
scale_factor = (1.0 / curr_step) * self.scale_constant_value
if self.scale_nominal_param_value:
scale_factor *= v
# Calculate the column of the sensitivity matrix
self.seq_jac[:, i] = (
measurement_vals_np[:, col_1] - measurement_vals_np[:, col_2]
) * scale_factor
# Increment the count
i += 1
# ToDo: As more complex measurement error schemes are put in place, this needs to change
# Add independent (non-correlated) measurement error for FIM calculation
cov_y = np.zeros((len(model.measurement_error), len(model.measurement_error)))
count = 0
for k, v in model.measurement_error.items():
cov_y[count, count] = 1 / v
count += 1
# Compute and record FIM
self.seq_FIM = self.seq_jac.T @ cov_y @ self.seq_jac + self.prior_FIM
# Use kaug to get FIM
def _kaug_FIM(self, model=None):
"""
Used to compute the FIM using kaug, a sensitivity-based
approach that directly computes the FIM.
Parameters
----------
model: model to compute FIM, default: None, (self.compute_FIM_model)
"""
# Remake compute_FIM_model if model is None.
# compute_FIM_model needs to be the right version for function to work.
if model is None:
self.compute_FIM_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
model = self.compute_FIM_model
# add zero (dummy/placeholder) objective function
if not hasattr(model, "objective"):
model.objective = pyo.Objective(expr=0, sense=pyo.minimize)
# Fix design variables to make the problem square
for comp in model.experiment_inputs:
comp.fix()
self.solver.solve(model, tee=self.tee)
# Probe the solved model for dsdp results (sensitivities s.t. parameters)
params_dict = {k.name: v for k, v in model.unknown_parameters.items()}
params_names = list(params_dict.keys())
dsdp_re, col = get_dsdp(model, params_names, params_dict, tee=self.tee)
# analyze result
dsdp_array = dsdp_re.toarray().T
# store dsdp returned
dsdp_extract = []
# get right lines from results
measurement_index = []
# loop over measurement variables and their time points
for k, v in model.experiment_outputs.items():
name = k.name
try:
kaug_no = col.index(name)
measurement_index.append(kaug_no)
# get right line of dsdp
dsdp_extract.append(dsdp_array[kaug_no])
except:
# k_aug does not provide value for fixed variables
self.logger.debug("The variable is fixed: %s", name)
# produce the sensitivity for fixed variables
zero_sens = np.zeros(len(params_names))
# for fixed variables, the sensitivity are a zero vector
dsdp_extract.append(zero_sens)
# Extract and calculate sensitivity if scaled by constants or parameters.
jac = [[] for k in params_names]
for d in range(len(dsdp_extract)):
for k, v in model.unknown_parameters.items():
p = params_names.index(k.name) # Index of parameter in np array
# if scaled by parameter value or constant value
sensi = dsdp_extract[d][p] * self.scale_constant_value
if self.scale_nominal_param_value:
sensi *= v
jac[p].append(sensi)
# record kaug jacobian
self.kaug_jac = np.array(jac).T
# Compute FIM
if self.prior_FIM is None:
self.prior_FIM = np.zeros((len(params_names), len(params_names)))
else:
self.check_model_FIM(FIM=self.prior_FIM)
# Constructing the Covariance of the measurements for the FIM calculation
# The following assumes independent measurement error.
cov_y = np.zeros((len(model.measurement_error), len(model.measurement_error)))
count = 0
for k, v in model.measurement_error.items():
cov_y[count, count] = 1 / v
count += 1
# ToDo: need to add a covariance matrix for measurements (sigma inverse)
# i.e., cov_y = self.cov_y or model.cov_y
# Still deciding where this would be best.
self.kaug_FIM = self.kaug_jac.T @ cov_y @ self.kaug_jac + self.prior_FIM
# Create the DoE model (with ``scenarios`` from finite differencing scheme)
[docs]
def create_doe_model(self, model=None):
"""
Add equations to compute sensitivities, FIM, and objective.
Builds the DoE model. Adds the scenarios, the sensitivity matrix
Q, the FIM, as well as the objective function to the model.
The function alters the ``model`` input.
In the single experiment case, ``model`` will be self.model. In the
multi-experiment case, ``model`` will be one experiment to be enumerated.
Parameters
----------
model: model to add finite difference scenarios
"""
if model is None:
model = self.model
else:
# TODO: Add safe naming when a model is passed by the user.
# doe_block = pyo.Block()
# doe_block_name = unique_component_name(model, "design_of_experiments_block")
# model.add_component(doe_block_name, doe_block)
pass
# Developer recommendation: use the Cholesky decomposition for D-optimality
# The explicit formula is available for benchmarking purposes and is NOT recommended
if (
self.only_compute_fim_lower
and self.objective_option == ObjectiveLib.determinant
and not self.Cholesky_option
):
raise ValueError(
"Cannot compute determinant with explicit formula if only_compute_fim_lower is True."
)
# Generate scenarios for finite difference formulae
self._generate_scenario_blocks(model=model)
# Set names for indexing sensitivity matrix (jacobian) and FIM
scen_block_ind = min(
[
k.name.split(".").index("scenario_blocks[0]")
for k in model.scenario_blocks[0].unknown_parameters.keys()
]
)
model.parameter_names = pyo.Set(
initialize=[
".".join(k.name.split(".")[(scen_block_ind + 1) :])
for k in model.scenario_blocks[0].unknown_parameters.keys()
]
)
model.output_names = pyo.Set(
initialize=[
".".join(k.name.split(".")[(scen_block_ind + 1) :])
for k in model.scenario_blocks[0].experiment_outputs.keys()
]
)
def identity_matrix(m, i, j):
if i == j:
return 1
else:
return 0
### Initialize the Jacobian if provided by the user
# If the user provides an initial Jacobian, convert it to a dictionary
if self.jac_initial is not None:
dict_jac_initialize = {}
for i, bu in enumerate(model.output_names):
for j, un in enumerate(model.parameter_names):
# Jacobian is a numpy array, rows are experimental outputs, columns are unknown parameters
dict_jac_initialize[(bu, un)] = self.jac_initial[i][j]
# Initialize the Jacobian matrix
def initialize_jac(m, i, j):
# If provided by the user, use the values now stored in the dictionary
if self.jac_initial is not None:
return dict_jac_initialize[(i, j)]
# Otherwise initialize to 0.1 (which is an arbitrary non-zero value)
else:
raise AttributeError(
"Jacobian being initialized when the jac_initial attribute is None. Please contact the developers as you should not see this error."
)
model.sensitivity_jacobian = pyo.Var(
model.output_names, model.parameter_names, initialize=initialize_jac
)
# Initialize the FIM
if self.fim_initial is not None:
dict_fim_initialize = {
(bu, un): self.fim_initial[i][j]
for i, bu in enumerate(model.parameter_names)
for j, un in enumerate(model.parameter_names)
}
def initialize_fim(m, j, d):
return dict_fim_initialize[(j, d)]
if self.fim_initial is not None:
model.fim = pyo.Var(
model.parameter_names, model.parameter_names, initialize=initialize_fim
)
else:
model.fim = pyo.Var(
model.parameter_names, model.parameter_names, initialize=identity_matrix
)
# To-Do: Look into this functionality.....
# if cholesky, define L elements as variables
if self.Cholesky_option and self.objective_option == ObjectiveLib.determinant:
model.L = pyo.Var(
model.parameter_names, model.parameter_names, initialize=identity_matrix
)
# loop over parameter name
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
# fix the 0 half of L matrix to be 0.0
if i < j:
model.L[c, d].fix(0.0)
# Give LB to the diagonal entries
if self.L_diagonal_lower_bound:
if c == d:
model.L[c, d].setlb(self.L_diagonal_lower_bound)
# jacobian rule
def jacobian_rule(m, n, p):
"""
m: Pyomo model
n: experimental output
p: unknown parameter
"""
fd_step_mult = 1
cuid = pyo.ComponentUID(n)
param_ind = m.parameter_names.data().index(p)
# Different FD schemes lead to different scenarios for the computation
if self.fd_formula == FiniteDifferenceStep.central:
s1 = param_ind * 2
s2 = param_ind * 2 + 1
fd_step_mult = 2
elif self.fd_formula == FiniteDifferenceStep.forward:
s1 = param_ind + 1
s2 = 0
elif self.fd_formula == FiniteDifferenceStep.backward:
s1 = 0
s2 = param_ind + 1
var_up = cuid.find_component_on(m.scenario_blocks[s1])
var_lo = cuid.find_component_on(m.scenario_blocks[s2])
param = m.parameter_scenarios[max(s1, s2)]
param_loc = pyo.ComponentUID(param).find_component_on(m.scenario_blocks[0])
param_val = m.scenario_blocks[0].unknown_parameters[param_loc]
param_diff = param_val * fd_step_mult * self.step
if self.scale_nominal_param_value:
return (
m.sensitivity_jacobian[n, p]
== (var_up - var_lo)
/ param_diff
* param_val
* self.scale_constant_value
)
else:
return (
m.sensitivity_jacobian[n, p]
== (var_up - var_lo) / param_diff * self.scale_constant_value
)
# A constraint to calculate elements in Hessian matrix
# transfer prior FIM to be Expressions
fim_initial_dict = {
(bu, un): self.prior_FIM[i][j]
for i, bu in enumerate(model.parameter_names)
for j, un in enumerate(model.parameter_names)
}
def read_prior(m, i, j):
return fim_initial_dict[(i, j)]
model.prior_FIM = pyo.Expression(
model.parameter_names, model.parameter_names, rule=read_prior
)
# Off-diagonal elements are symmetric, so only half of the off-diagonal elements need to be specified.
def fim_rule(m, p, q):
"""
m: Pyomo model
p: unknown parameter
q: unknown parameter
"""
p_ind = list(m.parameter_names).index(p)
q_ind = list(m.parameter_names).index(q)
# If the row is less than the column, skip the constraint
# This logic is consistent with making the FIM a lower
# triangular matrix (as is done later in this function)
if p_ind < q_ind:
if self.only_compute_fim_lower:
return pyo.Constraint.Skip
else:
return m.fim[p, q] == m.fim[q, p]
else:
return (
m.fim[p, q]
== sum(
1
/ m.scenario_blocks[0].measurement_error[
pyo.ComponentUID(n).find_component_on(m.scenario_blocks[0])
]
* m.sensitivity_jacobian[n, p]
* m.sensitivity_jacobian[n, q]
for n in m.output_names
)
+ m.prior_FIM[p, q]
)
model.jacobian_constraint = pyo.Constraint(
model.output_names, model.parameter_names, rule=jacobian_rule
)
model.fim_constraint = pyo.Constraint(
model.parameter_names, model.parameter_names, rule=fim_rule
)
if self.only_compute_fim_lower:
# Fix the upper half of the FIM matrix elements to be 0.0.
# This eliminates extra variables and ensures the expected number of
# degrees of freedom in the optimization problem.
for ind_p, p in enumerate(model.parameter_names):
for ind_q, q in enumerate(model.parameter_names):
if ind_p < ind_q:
model.fim[p, q].fix(0.0)
# Create scenario block structure
def _generate_scenario_blocks(self, model=None):
"""
Generates the modeling blocks corresponding to the scenarios for
the finite differencing scheme to compute the sensitivity jacobian
to compute the FIM.
The function alters the ``model`` input.
In the single experiment case, ``model`` will be self.model. In the
multi-experiment case, ``model`` will be one experiment to be enumerated.
Parameters
----------
model: model to add finite difference scenarios
"""
# If model is none, assume it is self.model
if model is None:
model = self.model
# Generate initial scenario to populate unknown parameter values
model.base_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
# Check the model that labels are correct
self.check_model_labels(model=model.base_model)
# Gather lengths of label structures for later use in the model build process
self.n_parameters = len(model.base_model.unknown_parameters)
self.n_measurement_error = len(model.base_model.measurement_error)
self.n_experiment_inputs = len(model.base_model.experiment_inputs)
self.n_experiment_outputs = len(model.base_model.experiment_outputs)
if self.n_measurement_error != self.n_experiment_outputs:
raise ValueError(
"Number of experiment outputs, {}, and length of measurement error, {}, do not match. Please check model labeling.".format(
self.n_experiment_outputs, self.n_measurement_error
)
)
self.logger.info("Experiment output and measurement error lengths match.")
# Check that the user input FIM and Jacobian are the correct dimension
if self.prior_FIM is not None:
self.check_model_FIM(FIM=self.prior_FIM)
else:
self.prior_FIM = np.zeros((self.n_parameters, self.n_parameters))
if self.fim_initial is not None:
self.check_model_FIM(FIM=self.fim_initial)
else:
self.fim_initial = np.eye(self.n_parameters) + self.prior_FIM
if self.jac_initial is not None:
self.check_model_jac(self.jac_initial)
else:
self.jac_initial = np.eye(self.n_experiment_outputs, self.n_parameters)
# Make a new Suffix to hold which scenarios are associated with parameters
model.parameter_scenarios = pyo.Suffix(direction=pyo.Suffix.LOCAL)
# Populate parameter scenarios, and scenario inds based on finite difference scheme
if self.fd_formula == FiniteDifferenceStep.central:
model.parameter_scenarios.update(
(2 * ind, k)
for ind, k in enumerate(model.base_model.unknown_parameters.keys())
)
model.parameter_scenarios.update(
(2 * ind + 1, k)
for ind, k in enumerate(model.base_model.unknown_parameters.keys())
)
model.scenarios = range(len(model.base_model.unknown_parameters) * 2)
elif self.fd_formula in [
FiniteDifferenceStep.forward,
FiniteDifferenceStep.backward,
]:
model.parameter_scenarios.update(
(ind + 1, k)
for ind, k in enumerate(model.base_model.unknown_parameters.keys())
)
model.scenarios = range(len(model.base_model.unknown_parameters) + 1)
else:
raise AttributeError(
"Finite difference option not recognized. Please contact the developers as you should not see this error."
)
# TODO: Allow Params for `unknown_parameters` and `experiment_inputs`
# May need to make a new converter Param to Var that allows non-string names/references to be passed
# Waiting on updates to the parmest params_to_vars utility function.....
# Run base model to get initialized model and check model function
for comp in model.base_model.experiment_inputs:
comp.fix()
try:
res = self.solver.solve(model.base_model, tee=self.tee)
assert res.solver.termination_condition == "optimal"
self.logger.info("Model from experiment solved.")
except:
raise RuntimeError(
"Model from experiment did not solve appropriately. Make sure the model is well-posed."
)
for comp in model.base_model.experiment_inputs:
comp.unfix()
# Generate blocks for finite difference scenarios
def build_block_scenarios(b, s):
# Generate model for the finite difference scenario
m = b.model()
b.transfer_attributes_from(m.base_model.clone())
# Forward/Backward difference have a stationary case (s == 0), no parameter to perturb
if self.fd_formula in [
FiniteDifferenceStep.forward,
FiniteDifferenceStep.backward,
]:
if s == 0:
return
param = m.parameter_scenarios[s]
# Perturbation to be (1 + diff) * param_value
if self.fd_formula == FiniteDifferenceStep.central:
diff = self.step * (
(-1) ** s
) # Positive perturbation, even; negative, odd
elif self.fd_formula == FiniteDifferenceStep.backward:
diff = self.step * -1 # Backward always negative perturbation
elif self.fd_formula == FiniteDifferenceStep.forward:
diff = self.step # Forward always positive
else:
# To-Do: add an error message for this as not being implemented yet
diff = 0
pass
# Update parameter values for the given finite difference scenario
pyo.ComponentUID(param, context=m.base_model).find_component_on(
b
).set_value(m.base_model.unknown_parameters[param] * (1 + diff))
res = self.solver.solve(b, tee=self.tee)
model.scenario_blocks = pyo.Block(model.scenarios, rule=build_block_scenarios)
# To-Do: this might have to change if experiment inputs have
# a different value in the Suffix (currently it is the CUID)
design_vars = [k for k, v in model.scenario_blocks[0].experiment_inputs.items()]
# Add constraints to equate block design with global design:
for ind, d in enumerate(design_vars):
con_name = "global_design_eq_con_" + str(ind)
# Constraint rule for global design constraints
def global_design_fixing(m, s):
if s == 0:
return pyo.Constraint.Skip
block_design_var = pyo.ComponentUID(
d, context=m.scenario_blocks[0]
).find_component_on(m.scenario_blocks[s])
return d == block_design_var
model.add_component(
con_name, pyo.Constraint(model.scenarios, rule=global_design_fixing)
)
# Clean up the base model used to generate the scenarios
model.del_component(model.base_model)
# ToDo: consider this logic? Multi-block systems need something more fancy
self._built_scenarios = True
# Create objective function
[docs]
def create_objective_function(self, model=None):
"""
Generates the objective function as an expression and as a
Pyomo Objective object
The function alters the ``model`` input.
In the single experiment case, ``model`` will be self.model. In the
multi-experiment case, ``model`` will be one experiment to be enumerated.
Parameters
----------
model: model to add finite difference scenarios
"""
if model is None:
model = self.model
if self.objective_option not in [
ObjectiveLib.determinant,
ObjectiveLib.trace,
ObjectiveLib.zero,
]:
raise AttributeError(
"Objective option not recognized. Please contact the developers as you should not see this error."
)
if not hasattr(model, "fim"):
raise RuntimeError(
"Model provided does not have variable `fim`. Please make sure the model is built properly before creating the objective."
)
small_number = 1e-10
# Make objective block for constraints connected to objective
model.obj_cons = pyo.Block()
# Assemble the FIM matrix. This is helpful for initialization!
fim_vals = [
model.fim[bu, un].value
for i, bu in enumerate(model.parameter_names)
for j, un in enumerate(model.parameter_names)
]
fim = np.array(fim_vals).reshape(
len(model.parameter_names), len(model.parameter_names)
)
### Initialize the Cholesky decomposition matrix
if self.Cholesky_option and self.objective_option == ObjectiveLib.determinant:
# Calculate the eigenvalues of the FIM matrix
eig = np.linalg.eigvals(fim)
# If the smallest eigenvalue is (practically) negative, add a diagonal matrix to make it positive definite
small_number = 1e-10
if min(eig) < small_number:
fim = fim + np.eye(len(model.parameter_names)) * (
small_number - min(eig)
)
# Compute the Cholesky decomposition of the FIM matrix
L = np.linalg.cholesky(fim)
# Initialize the Cholesky matrix
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
model.L[c, d].value = L[i, j]
def cholesky_imp(b, c, d):
"""
Calculate Cholesky L matrix using algebraic constraints
"""
# If the row is greater than or equal to the column, we are in the
# lower triangle region of the L and FIM matrices.
# This region is where our equations are well-defined.
m = b.model()
if list(m.parameter_names).index(c) >= list(m.parameter_names).index(d):
return m.fim[c, d] == sum(
m.L[c, m.parameter_names.at(k + 1)]
* m.L[d, m.parameter_names.at(k + 1)]
for k in range(list(m.parameter_names).index(d) + 1)
)
else:
# This is the empty half of L above the diagonal
return pyo.Constraint.Skip
def trace_calc(b):
"""
Calculate FIM elements. Can scale each element with 1000 for performance
"""
m = b.model()
return m.trace == sum(m.fim[j, j] for j in m.parameter_names)
def determinant_general(b):
r"""Calculate determinant. Can be applied to FIM of any size.
det(A) = \sum_{\sigma in \S_n} (sgn(\sigma) * \Prod_{i=1}^n a_{i,\sigma_i})
Use permutation() to get permutations, sgn() to get signature
"""
m = b.model()
r_list = list(range(len(m.parameter_names)))
# get all permutations
object_p = permutations(r_list)
list_p = list(object_p)
# generate a name_order to iterate \sigma_i
det_perm = 0
for i in range(len(list_p)):
name_order = []
x_order = list_p[i]
# sigma_i is the value in the i-th position after the reordering \sigma
for x in range(len(x_order)):
for y, element in enumerate(m.parameter_names):
if x_order[x] == y:
name_order.append(element)
# det(A) = sum_{\sigma \in \S_n} (sgn(\sigma) * \Prod_{i=1}^n a_{i,\sigma_i})
det_perm = sum(
self._sgn(list_p[d])
* math.prod(
m.fim[m.parameter_names.at(val + 1), m.parameter_names.at(ind + 1)]
for ind, val in enumerate(list_p[d])
)
for d in range(len(list_p))
)
return m.determinant == det_perm
if self.Cholesky_option and self.objective_option == ObjectiveLib.determinant:
model.obj_cons.cholesky_cons = pyo.Constraint(
model.parameter_names, model.parameter_names, rule=cholesky_imp
)
model.objective = pyo.Objective(
expr=2 * sum(pyo.log10(model.L[j, j]) for j in model.parameter_names),
sense=pyo.maximize,
)
elif self.objective_option == ObjectiveLib.determinant:
# if not cholesky but determinant, calculating det and evaluate the OBJ with det
model.determinant = pyo.Var(
initialize=np.linalg.det(fim), bounds=(small_number, None)
)
model.obj_cons.determinant_rule = pyo.Constraint(rule=determinant_general)
model.objective = pyo.Objective(
expr=pyo.log10(model.determinant + 1e-6), sense=pyo.maximize
)
elif self.objective_option == ObjectiveLib.trace:
# if not determinant or cholesky, calculating the OBJ with trace
model.trace = pyo.Var(initialize=np.trace(fim), bounds=(small_number, None))
model.obj_cons.trace_rule = pyo.Constraint(rule=trace_calc)
model.objective = pyo.Objective(
expr=pyo.log10(model.trace), sense=pyo.maximize
)
elif self.objective_option == ObjectiveLib.zero:
# add dummy objective function
model.objective = pyo.Objective(expr=0)
# Check to see if the model has all the required suffixes
[docs]
def check_model_labels(self, model=None):
"""
Checks if the model contains the necessary suffixes for the
DoE model to be constructed automatically.
Parameters
----------
model: model for suffix checking
"""
# Check that experimental outputs exist
try:
outputs = [k.name for k, v in model.experiment_outputs.items()]
except:
raise RuntimeError(
"Experiment model does not have suffix " + '"experiment_outputs".'
)
# Check that experimental inputs exist
try:
outputs = [k.name for k, v in model.experiment_inputs.items()]
except:
raise RuntimeError(
"Experiment model does not have suffix " + '"experiment_inputs".'
)
# Check that unknown parameters exist
try:
outputs = [k.name for k, v in model.unknown_parameters.items()]
except:
raise RuntimeError(
"Experiment model does not have suffix " + '"unknown_parameters".'
)
# Check that measurement errors exist
try:
outputs = [k.name for k, v in model.measurement_error.items()]
except:
raise RuntimeError(
"Experiment model does not have suffix " + '"measurement_error".'
)
self.logger.info("Model has expected labels.")
# Check the FIM shape against what is expected from the model.
[docs]
def check_model_FIM(self, model=None, FIM=None):
"""
Checks if the specified matrix, FIM, matches the shape expected
from the model. This method should only be called after the
model has been probed for the length of the unknown parameter,
experiment input, experiment output, and measurement error
has been stored to the object.
Parameters
----------
model: model for suffix checking, Default: None, (self.model)
FIM: FIM value to check on the model
"""
if model is None:
model = self.model
if FIM.shape != (self.n_parameters, self.n_parameters):
raise ValueError(
"Shape of FIM provided should be n parameters by n parameters, or {} by {}, FIM provided has shape {} by {}".format(
self.n_parameters, self.n_parameters, FIM.shape[0], FIM.shape[1]
)
)
self.logger.info("FIM provided matches expected dimensions from model.")
# Check the jacobian shape against what is expected from the model.
def check_model_jac(self, jac=None):
if jac.shape != (self.n_experiment_outputs, self.n_parameters):
raise ValueError(
"Shape of Jacobian provided should be n experiment outputs by n parameters, or {} by {}, Jacobian provided has shape {} by {}".format(
self.n_experiment_outputs,
self.n_parameters,
jac.shape[0],
jac.shape[1],
)
)
self.logger.info("Jacobian provided matches expected dimensions from model.")
# Update the FIM for the specified model
[docs]
def update_FIM_prior(self, model=None, FIM=None):
"""
Updates the prior FIM on the model object. This may be useful when
running a loop and the user doesn't want to rebuild the model
because it is expensive to build/initialize.
Parameters
----------
model: model where FIM prior is to be updated, Default: None, (self.model)
FIM: 2D np array to be the new FIM prior, Default: None
"""
if model is None:
model = self.model
# Check FIM input
if FIM is None:
raise ValueError(
"FIM input for update_FIM_prior must be a 2D, square numpy array."
)
if not hasattr(model, "fim"):
raise RuntimeError(
"``fim`` is not defined on the model provided. Please build the model first."
)
self.check_model_FIM(model=model, FIM=FIM)
# Update FIM prior
for ind1, p1 in enumerate(model.parameter_names):
for ind2, p2 in enumerate(model.parameter_names):
model.prior_FIM[p1, p2].set_value(FIM[ind1, ind2])
self.logger.info("FIM prior has been updated.")
# ToDo: Add an update function for the parameter values? --> closed loop parameter estimation?
# Or leave this to the user?????
def update_unknown_parameter_values(self, model=None, param_vals=None):
raise NotImplementedError(
"Updating unknown parameter values not yet supported."
)
# Evaluates FIM and statistics for a full factorial space (same as run_grid_search)
[docs]
def compute_FIM_full_factorial(
self, model=None, design_ranges=None, method="sequential"
):
"""
Will run a simulation-based full factorial exploration of
the experimental input space (i.e., a ``grid search`` or
``parameter sweep``) to understand how the FIM metrics
change as a function of the experimental design space.
Parameters
----------
model: model to perform the full factorial exploration on
design_ranges: dict of lists, of the form {<var_name>: [start, stop, numsteps]}
method: string to specify which method should be used
options are ``kaug`` and ``sequential``
"""
# Start timer
sp_timer = TicTocTimer()
sp_timer.tic(msg=None)
self.logger.info("Beginning Full Factorial Design.")
# Make new model for factorial design
self.factorial_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
model = self.factorial_model
# Permute the inputs to be aligned with the experiment input indices
design_ranges_enum = {k: np.linspace(*v) for k, v in design_ranges.items()}
design_map = {
ind: (k[0].name, k[0])
for ind, k in enumerate(model.experiment_inputs.items())
}
# Make the full space
try:
valid_inputs = 0
des_ranges = []
for k, v in design_map.items():
if v[0] in design_ranges_enum.keys():
des_ranges.append(design_ranges_enum[v[0]])
valid_inputs += 1
assert valid_inputs > 0
factorial_points = product(*des_ranges)
except:
raise ValueError(
"Design ranges keys must be a subset of experimental design names."
)
# ToDo: Add more objective types? i.e., modified-E; G-opt; V-opt; etc?
# ToDo: Also, make this a result object, or more user friendly.
fim_factorial_results = {k.name: [] for k, v in model.experiment_inputs.items()}
fim_factorial_results.update(
{
"log10 D-opt": [],
"log10 A-opt": [],
"log10 E-opt": [],
"log10 ME-opt": [],
"solve_time": [],
}
)
successes = 0
failures = 0
total_points = np.prod(
np.array([len(v) for k, v in design_ranges_enum.items()])
)
time_set = []
curr_point = 1 # Initial current point
for design_point in factorial_points:
# Fix design variables at fixed experimental design point
for i in range(len(design_point)):
design_map[i][1].fix(design_point[i])
# Timing and logging objects
self.logger.info("=======Iteration Number: %s =====", curr_point)
iter_timer = TicTocTimer()
iter_timer.tic(msg=None)
# Compute FIM with given options
try:
curr_point = successes + failures + 1
# Logging information for each run
self.logger.info("This is run %s out of %s.", curr_point, total_points)
# Attempt the FIM computation
self.compute_FIM(model=model, method=method)
successes += 1
# iteration time
iter_t = iter_timer.toc(msg=None)
time_set.append(iter_t)
# More logging
self.logger.info(
"The code has run for %s seconds.", round(sum(time_set), 2)
)
self.logger.info(
"Estimated remaining time: %s seconds",
round(
sum(time_set) / (curr_point) * (total_points - curr_point + 1),
2,
),
)
except:
self.logger.warning(
":::::::::::Warning: Cannot converge this run.::::::::::::"
)
failures += 1
self.logger.warning("failed count:", failures)
self._computed_FIM = np.zeros(self.prior_FIM.shape)
iter_t = iter_timer.toc(msg=None)
time_set.append(iter_t)
FIM = self._computed_FIM
# Compute and record metrics on FIM
D_opt = np.log10(np.linalg.det(FIM))
A_opt = np.log10(np.trace(FIM))
E_vals, E_vecs = np.linalg.eig(FIM) # Grab eigenvalues
E_ind = np.argmin(E_vals.real) # Grab index of minima to check imaginary
# Warn the user if there is a ``large`` imaginary component (should not be)
if abs(E_vals.imag[E_ind]) > 1e-8:
self.logger.warning(
"Eigenvalue has imaginary component greater than 1e-6, contact developers if this issue persists."
)
# If the real value is less than or equal to zero, set the E_opt value to nan
if E_vals.real[E_ind] <= 0:
E_opt = np.nan
else:
E_opt = np.log10(E_vals.real[E_ind])
ME_opt = np.log10(np.linalg.cond(FIM))
# Append the values for each of the experiment inputs
for k, v in model.experiment_inputs.items():
fim_factorial_results[k.name].append(pyo.value(k))
fim_factorial_results["log10 D-opt"].append(D_opt)
fim_factorial_results["log10 A-opt"].append(A_opt)
fim_factorial_results["log10 E-opt"].append(E_opt)
fim_factorial_results["log10 ME-opt"].append(ME_opt)
fim_factorial_results["solve_time"].append(time_set[-1])
self.fim_factorial_results = fim_factorial_results
return self.fim_factorial_results
# TODO: Overhaul plotting functions to not use strings
# TODO: Make the plotting functionalities work for >2 design features
def _curve1D(
self,
title_text,
xlabel_text,
font_axes=16,
font_tick=14,
figure_file_name=None,
log_scale=True,
):
"""
Draw 1D sensitivity curves for all design criteria
Parameters
----------
title_text: name of the figure, a string
xlabel_text: x label title, a string.
In a 1D sensitivity curve, it is the design variable by which the curve is drawn.
font_axes: axes label font size
font_tick: tick label font size
figure_file_name: string or Path, path to save the figure as
log_scale: if True, the result matrix will be scaled by log10
Returns
--------
4 Figures of 1D sensitivity curves for each criteria
"""
if figure_file_name is not None:
show_fig = False
else:
show_fig = True
# extract the range of the DOF design variable
x_range = self.figure_result_data[self.figure_sens_des_vars[0]].values.tolist()
# decide if the results are log scaled
if log_scale:
y_range_A = np.log10(self.figure_result_data["log10 A-opt"].values.tolist())
y_range_D = np.log10(self.figure_result_data["log10 D-opt"].values.tolist())
y_range_E = np.log10(self.figure_result_data["log10 E-opt"].values.tolist())
y_range_ME = np.log10(
self.figure_result_data["log10 ME-opt"].values.tolist()
)
else:
y_range_A = self.figure_result_data["log10 A-opt"].values.tolist()
y_range_D = self.figure_result_data["log10 D-opt"].values.tolist()
y_range_E = self.figure_result_data["log10 E-opt"].values.tolist()
y_range_ME = self.figure_result_data["log10 ME-opt"].values.tolist()
# Draw A-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
# plt.rcParams.update(params)
ax.plot(x_range, y_range_A)
ax.scatter(x_range, y_range_A)
ax.set_ylabel("$log_{10}$ Trace")
ax.set_xlabel(xlabel_text)
plt.pyplot.title(title_text + ": A-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
Path(figure_file_name + "_A_opt.png"), format="png", dpi=450
)
# Draw D-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
# plt.rcParams.update(params)
ax.plot(x_range, y_range_D)
ax.scatter(x_range, y_range_D)
ax.set_ylabel("$log_{10}$ Determinant")
ax.set_xlabel(xlabel_text)
plt.pyplot.title(title_text + ": D-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
Path(figure_file_name + "_D_opt.png"), format="png", dpi=450
)
# Draw E-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
# plt.rcParams.update(params)
ax.plot(x_range, y_range_E)
ax.scatter(x_range, y_range_E)
ax.set_ylabel("$log_{10}$ Minimal eigenvalue")
ax.set_xlabel(xlabel_text)
plt.pyplot.title(title_text + ": E-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
Path(figure_file_name + "_E_opt.png"), format="png", dpi=450
)
# Draw Modified E-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
# plt.rcParams.update(params)
ax.plot(x_range, y_range_ME)
ax.scatter(x_range, y_range_ME)
ax.set_ylabel("$log_{10}$ Condition number")
ax.set_xlabel(xlabel_text)
plt.pyplot.title(title_text + ": Modified E-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
Path(figure_file_name + "_ME_opt.png"), format="png", dpi=450
)
def _heatmap(
self,
title_text,
xlabel_text,
ylabel_text,
font_axes=16,
font_tick=14,
figure_file_name=None,
log_scale=True,
):
"""
Draw 2D heatmaps for all design criteria
Parameters
----------
title_text: name of the figure, a string
xlabel_text: x label title, a string.
In a 2D heatmap, it should be the second design variable in the design_ranges
ylabel_text: y label title, a string.
In a 2D heatmap, it should be the first design variable in the dv_ranges
font_axes: axes label font size
font_tick: tick label font size
figure_file_name: string or Path, path to save the figure as
log_scale: if True, the result matrix will be scaled by log10
Returns
--------
4 Figures of 2D heatmap for each criteria
"""
if figure_file_name is not None:
show_fig = False
else:
show_fig = True
des_names = [k for k, v in self.figure_fixed_des_vars.items()]
sens_ranges = {}
for i in self.figure_sens_des_vars:
sens_ranges[i] = list(self.figure_result_data[i].unique())
x_range = sens_ranges[self.figure_sens_des_vars[0]]
y_range = sens_ranges[self.figure_sens_des_vars[1]]
# extract the design criteria values
A_range = self.figure_result_data["log10 A-opt"].values.tolist()
D_range = self.figure_result_data["log10 D-opt"].values.tolist()
E_range = self.figure_result_data["log10 E-opt"].values.tolist()
ME_range = self.figure_result_data["log10 ME-opt"].values.tolist()
# reshape the design criteria values for heatmaps
cri_a = np.asarray(A_range).reshape(len(x_range), len(y_range))
cri_d = np.asarray(D_range).reshape(len(x_range), len(y_range))
cri_e = np.asarray(E_range).reshape(len(x_range), len(y_range))
cri_e_cond = np.asarray(ME_range).reshape(len(x_range), len(y_range))
self.cri_a = cri_a
self.cri_d = cri_d
self.cri_e = cri_e
self.cri_e_cond = cri_e_cond
# decide if log scaled
if log_scale:
hes_a = np.log10(self.cri_a)
hes_e = np.log10(self.cri_e)
hes_d = np.log10(self.cri_d)
hes_e2 = np.log10(self.cri_e_cond)
else:
hes_a = self.cri_a
hes_e = self.cri_e
hes_d = self.cri_d
hes_e2 = self.cri_e_cond
# set heatmap x,y ranges
xLabel = x_range
yLabel = y_range
# A-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
plt.pyplot.rcParams.update(params)
ax.set_yticks(range(len(yLabel)))
ax.set_yticklabels(yLabel)
ax.set_ylabel(ylabel_text)
ax.set_xticks(range(len(xLabel)))
ax.set_xticklabels(xLabel)
ax.set_xlabel(xlabel_text)
im = ax.imshow(hes_a.T, cmap=plt.pyplot.cm.hot_r)
ba = plt.pyplot.colorbar(im)
ba.set_label("log10(trace(FIM))")
plt.pyplot.title(title_text + ": A-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
Path(figure_file_name + "_A_opt.png"), format="png", dpi=450
)
# D-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
plt.pyplot.rcParams.update(params)
ax.set_yticks(range(len(yLabel)))
ax.set_yticklabels(yLabel)
ax.set_ylabel(ylabel_text)
ax.set_xticks(range(len(xLabel)))
ax.set_xticklabels(xLabel)
ax.set_xlabel(xlabel_text)
im = ax.imshow(hes_d.T, cmap=plt.pyplot.cm.hot_r)
ba = plt.pyplot.colorbar(im)
ba.set_label("log10(det(FIM))")
plt.pyplot.title(title_text + ": D-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
Path(figure_file_name + "_D_opt.png"), format="png", dpi=450
)
# E-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
plt.pyplot.rcParams.update(params)
ax.set_yticks(range(len(yLabel)))
ax.set_yticklabels(yLabel)
ax.set_ylabel(ylabel_text)
ax.set_xticks(range(len(xLabel)))
ax.set_xticklabels(xLabel)
ax.set_xlabel(xlabel_text)
im = ax.imshow(hes_e.T, cmap=plt.pyplot.cm.hot_r)
ba = plt.pyplot.colorbar(im)
ba.set_label("log10(minimal eig(FIM))")
plt.pyplot.title(title_text + ": E-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
Path(figure_file_name + "_E_opt.png"), format="png", dpi=450
)
# Modified E-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
plt.pyplot.rcParams.update(params)
ax.set_yticks(range(len(yLabel)))
ax.set_yticklabels(yLabel)
ax.set_ylabel(ylabel_text)
ax.set_xticks(range(len(xLabel)))
ax.set_xticklabels(xLabel)
ax.set_xlabel(xlabel_text)
im = ax.imshow(hes_e2.T, cmap=plt.pyplot.cm.hot_r)
ba = plt.pyplot.colorbar(im)
ba.set_label("log10(cond(FIM))")
plt.pyplot.title(title_text + ": Modified E-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
Path(figure_file_name + "_ME_opt.png"), format="png", dpi=450
)
# Gets the FIM from an existing model
[docs]
def get_FIM(self, model=None):
"""
Gets the FIM values from the model specified
Parameters
----------
model: model to grab FIM from, Default: None, (self.model)
Returns
-------
FIM: 2D list representation of the FIM (can be cast to numpy)
"""
if model is None:
model = self.model
if not hasattr(model, "fim"):
raise RuntimeError(
"Model provided does not have variable `fim`. Please make sure the model is built properly before calling `get_FIM`"
)
fim_vals = [
pyo.value(model.fim[i, j])
for i in model.parameter_names
for j in model.parameter_names
]
fim_np = np.array(fim_vals).reshape(
(len(model.parameter_names), len(model.parameter_names))
)
# FIM is a lower triangular matrix for the optimal DoE problem.
# Exploit symmetry to fill in the zeros.
for i in range(len(model.parameter_names)):
for j in range(len(model.parameter_names)):
if j < i:
fim_np[j, i] = fim_np[i, j]
return [list(row) for row in list(fim_np)]
# Gets the sensitivity matrix from an existing model
[docs]
def get_sensitivity_matrix(self, model=None):
"""
Gets the sensitivity matrix (Q) values from the model specified.
Parameters
----------
model: model to grab Q from, Default: None, (self.model)
Returns
-------
Q: 2D list representation of the sensitivity matrix (can be cast to numpy)
"""
if model is None:
model = self.model
if not hasattr(model, "sensitivity_jacobian"):
raise RuntimeError(
"Model provided does not have variable `sensitivity_jacobian`. Please make sure the model is built properly before calling `get_sensitivity_matrix`"
)
Q_vals = [
pyo.value(model.sensitivity_jacobian[i, j])
for i in model.output_names
for j in model.parameter_names
]
Q_np = np.array(Q_vals).reshape(
(len(model.output_names), len(model.parameter_names))
)
return [list(row) for row in list(Q_np)]
# Gets the experiment input values from an existing model
# Gets the unknown parameter values from an existing model
[docs]
def get_unknown_parameter_values(self, model=None):
"""
Gets the unknown parameter values (theta)
from the model specified.
Parameters
----------
model: model to grab theta from,
default: None, (self.model)
Returns
-------
theta: 1D list of unknown parameter values at which this experiment was designed
"""
if model is None:
model = self.model
if not hasattr(model, "unknown_parameters"):
if not hasattr(model, "scenario_blocks"):
raise RuntimeError(
"Model provided does not have expected structure. Please make sure model is built properly before calling `get_experiment_input_values`"
)
theta_vals = [
pyo.value(k)
for k, v in model.scenario_blocks[0].unknown_parameters.items()
]
else:
theta_vals = [pyo.value(k) for k, v in model.unknown_parameters.items()]
return theta_vals
# Gets the experiment output values from an existing model
[docs]
def get_experiment_output_values(self, model=None):
"""
Gets the experiment output values (y hat)
from the model specified.
Parameters
----------
model: model to grab y hat from,
default: None, (self.model)
Returns
-------
y_hat: 1D list of experiment output values from the design experiment
"""
if model is None:
model = self.model
if not hasattr(model, "experiment_outputs"):
if not hasattr(model, "scenario_blocks"):
raise RuntimeError(
"Model provided does not have expected structure. Please make sure model is built properly before calling `get_experiment_input_values`"
)
y_hat_vals = [
pyo.value(k)
for k, v in model.scenario_blocks[0].measurement_error.items()
]
else:
y_hat_vals = [pyo.value(k) for k, v in model.measurement_error.items()]
return y_hat_vals
# ToDo: For more complicated error structures, this should become
# get cov_y, or so, and this method will be deprecated
# Gets the measurement error values from an existing model
[docs]
def get_measurement_error_values(self, model=None):
"""
Gets the experiment output values (sigma)
from the model specified.
Parameters
----------
model: model to grab sigma values from,
default: None, (self.model)
Returns
-------
sigma_diag: 1D list of measurement errors used to design the experiment
"""
if model is None:
model = self.model
if not hasattr(model, "measurement_error"):
if not hasattr(model, "scenario_blocks"):
raise RuntimeError(
"Model provided does not have expected structure. Please make sure model is built properly before calling `get_experiment_input_values`"
)
sigma_vals = [
pyo.value(k)
for k, v in model.scenario_blocks[0].measurement_error.items()
]
else:
sigma_vals = [pyo.value(k) for k, v in model.measurement_error.items()]
return sigma_vals
# Helper function for determinant calculation
def _sgn(self, p):
"""
This is a helper function for when constructing the determinant formula
without the Cholesky factorization.
Parameters
-----------
p: the permutation (a list)
Returns
-------
1 if the number of exchange is an even number
-1 if the number is an odd number
"""
if len(p) == 1:
return 1
trans = 0
for i in range(0, len(p)):
j = i + 1
for j in range(j, len(p)):
if p[i] > p[j]:
trans = trans + 1
if (trans % 2) == 0:
return 1
else:
return -1