# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
#
# Development of this module was conducted as part of the Institute for
# the Design of Advanced Energy Systems (IDAES) with support through the
# Simulation-Based Engineering, Crosscutting Research Program within the
# U.S. Department of Energy’s Office of Fossil Energy and Carbon Management.
#
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
import logging
from pyomo.common.collections import ComponentMap, ComponentSet
from pyomo.common.modeling import unique_component_name
from pyomo.contrib.trustregion.util import minIgnoreNone, maxIgnoreNone
from pyomo.core import (
Block,
Param,
VarList,
Constraint,
Objective,
value,
Set,
ExternalFunction,
maximize,
minimize,
)
from pyomo.core.expr.calculus.derivatives import differentiate
from pyomo.core.expr.visitor import identify_variables, ExpressionReplacementVisitor
from pyomo.core.expr.numeric_expr import ExternalFunctionExpression
from pyomo.core.expr.numvalue import native_types
from pyomo.opt import SolverFactory, check_optimal_termination
logger = logging.getLogger('pyomo.contrib.trustregion')
[docs]
class EFReplacement(ExpressionReplacementVisitor):
"""
This class is a subclass of ExpressionReplacementVisitor.
It replaces an external function expression in an expression tree with a
"holder" variable (recorded in a ComponentMap) and sets the initial value
of the new node on the tree to that of the original node, if it can.
NOTE: We use an empty substitution map. The EFs to be substituted are
identified as part of exitNode.
"""
[docs]
def __init__(self, trfData, efSet):
super().__init__(
descend_into_named_expressions=True, remove_named_expressions=False
)
self.trfData = trfData
self.efSet = efSet
def beforeChild(self, node, child, child_idx):
# We want to capture all of the variables on the model.
# If we reject a step, we need to know all the vars to reset.
descend, result = super().beforeChild(node, child, child_idx)
if (
not descend
and result.__class__ not in native_types
and result.is_variable_type()
):
self.trfData.all_variables.add(result)
return descend, result
def exitNode(self, node, data):
# This is where the replacement happens.
new_node = super().exitNode(node, data)
if new_node.__class__ is not ExternalFunctionExpression:
return new_node
if self.efSet is not None and new_node._fcn not in self.efSet:
# SKIP: efSet is provided and this node is not in it.
return new_node
_output = self.trfData.ef_outputs.add()
# Set the value for the new node to the evaluated value of the
# original node, if possible.
try:
_output.set_value(value(node))
except:
_output.set_value(0)
# Preserve the new node as a truth model.
# self.TRF.truth_models is a ComponentMap.
self.trfData.truth_models[_output] = new_node
return _output
[docs]
class TRFInterface(object):
"""
Pyomo interface for Trust Region algorithm.
"""
[docs]
def __init__(self, model, decision_variables, ext_fcn_surrogate_map_rule, config):
self.original_model = model
tmp_name = unique_component_name(self.original_model, 'tmp')
setattr(self.original_model, tmp_name, decision_variables)
self.config = config
self.model = self.original_model.clone()
self.decision_variables = getattr(self.model, tmp_name)
delattr(self.original_model, tmp_name)
self.data = Block()
self.model.add_component(
unique_component_name(self.model, 'trf_data'), self.data
)
self.basis_expression_rule = ext_fcn_surrogate_map_rule
self.efSet = None
self.solver = SolverFactory(self.config.solver)
# TODO: Provide an API for users to set this only to substitute
# a subset of identified external functions.
# Also rename to "efFilterSet" or something similar.
[docs]
def replaceEF(self, expr):
"""
Replace an External Function.
Arguments:
expr : a Pyomo expression. We will search this expression tree
This function returns an expression after removing any
ExternalFunction in the set efSet from the expression tree
`expr` and replacing them with variables.
New variables are declared on the `TRF` block.
TODO: Future work - investigate direct substitution of basis or
surrogate models using Expression objects instead of new variables.
"""
return EFReplacement(self.data, self.efSet).walk_expression(expr)
def _remove_ef_from_expr(self, component):
"""
This method takes a component and looks at its expression.
If the expression contains an external function (EF), a new expression
with the EF replaced with a "holder" variable is added to the component
and the basis expression for the new "holder" variable is updated.
"""
expr = component.expr
next_ef_id = len(self.data.ef_outputs)
new_expr = self.replaceEF(expr)
if new_expr is not expr:
component.set_value(new_expr)
new_output_vars = list(
self.data.ef_outputs[i + 1]
for i in range(next_ef_id, len(self.data.ef_outputs))
)
for v in new_output_vars:
self.data.basis_expressions[v] = self.basis_expression_rule(
component, self.data.truth_models[v]
)
[docs]
def replaceExternalFunctionsWithVariables(self):
"""
This method sets up essential data objects on the new trf_data block
on the model as well as triggers the replacement of external functions
in expressions trees.
Data objects created:
self.data.all_variables : ComponentSet
A set of all variables on the model, including "holder"
variables from the EF replacement
self.data.truth_models : ComponentMap
A component map for replaced nodes that keeps track of
the truth model for that replacement.
self.data.basis_expressions : ComponentMap
A component map for the Pyomo expressions for basis functions
as they apply to each variable
self.data.ef_inputs : Dict
A dictionary that tracks the input variables for each EF
self.data.ef_outputs : VarList
A list of the "holder" variables which replaced the original
External Function expressions
"""
self.data.all_variables = ComponentSet()
self.data.truth_models = ComponentMap()
self.data.basis_expressions = ComponentMap()
self.data.ef_inputs = {}
self.data.ef_outputs = VarList()
number_of_equality_constraints = 0
for con in self.model.component_data_objects(Constraint, active=True):
if con.lb == con.ub and con.lb is not None:
number_of_equality_constraints += 1
self._remove_ef_from_expr(con)
self.degrees_of_freedom = (
len(list(self.data.all_variables)) - number_of_equality_constraints
)
if self.degrees_of_freedom != len(self.decision_variables):
raise ValueError(
"replaceExternalFunctionsWithVariables: "
"The degrees of freedom %d do not match the number of decision "
"variables supplied %d."
% (self.degrees_of_freedom, len(self.decision_variables))
)
for var in self.decision_variables:
if var not in self.data.all_variables:
raise ValueError(
"replaceExternalFunctionsWithVariables: "
f"The supplied decision variable {var.name} cannot "
"be found in the model variables."
)
self.data.objs = list(self.model.component_data_objects(Objective, active=True))
# HACK: This is a hack that we will want to remove once the NL writer
# has been corrected to not send unused EFs to the solver
for ef in self.model.component_objects(ExternalFunction):
ef.parent_block().del_component(ef)
if len(self.data.objs) != 1:
raise ValueError(
"replaceExternalFunctionsWithVariables: "
"TrustRegion only supports models with a single active Objective."
)
if self.data.objs[0].sense == maximize:
self.data.objs[0].expr = -1 * self.data.objs[0].expr
self.data.objs[0].sense = minimize
self._remove_ef_from_expr(self.data.objs[0])
for i in self.data.ef_outputs:
self.data.ef_inputs[i] = list(
identify_variables(
self.data.truth_models[self.data.ef_outputs[i]], include_fixed=False
)
)
self.data.all_variables.update(self.data.ef_outputs.values())
self.data.all_variables = list(self.data.all_variables)
[docs]
def createConstraints(self):
"""
Create the basis constraint y = b(w) (equation 3) and the
surrogate model constraint y = r_k(w) (equation 5)
Both constraints are immediately deactivated after creation and
are activated later as necessary.
"""
b = self.data
# This implements: y = b(w) from Yoshio/Biegler (2020)
@b.Constraint(b.ef_outputs.index_set())
def basis_constraint(b, i):
ef_output_var = b.ef_outputs[i]
return ef_output_var == b.basis_expressions[ef_output_var]
b.basis_constraint.deactivate()
b.INPUT_OUTPUT = Set(
initialize=(
(i, j)
for i in b.ef_outputs.index_set()
for j in range(len(b.ef_inputs[i]))
)
)
b.basis_model_output = Param(b.ef_outputs.index_set(), mutable=True)
b.grad_basis_model_output = Param(b.INPUT_OUTPUT, mutable=True)
b.truth_model_output = Param(b.ef_outputs.index_set(), mutable=True)
b.grad_truth_model_output = Param(b.INPUT_OUTPUT, mutable=True)
b.value_of_ef_inputs = Param(b.INPUT_OUTPUT, mutable=True)
# This implements: y = r_k(w)
@b.Constraint(b.ef_outputs.index_set())
def sm_constraint_basis(b, i):
ef_output_var = b.ef_outputs[i]
return ef_output_var == b.basis_expressions[
ef_output_var
] + b.truth_model_output[i] - b.basis_model_output[i] + sum(
(b.grad_truth_model_output[i, j] - b.grad_basis_model_output[i, j])
* (w - b.value_of_ef_inputs[i, j])
for j, w in enumerate(b.ef_inputs[i])
)
b.sm_constraint_basis.deactivate()
[docs]
def getCurrentDecisionVariableValues(self):
"""
Return current decision variable values
"""
decision_values = {}
for var in self.decision_variables:
decision_values[var.name] = value(var)
return decision_values
[docs]
def updateDecisionVariableBounds(self, radius):
"""Update the TRSP_k decision variable bounds
This corresponds to:
.. math::
|| E^{-1} (u - u_k) || <= trust_radius
We omit :math:`E^{-1}` because we assume that the users have
correctly scaled their variables.
"""
for var in self.decision_variables:
var.setlb(
maxIgnoreNone(
value(var) - radius, self.initial_decision_bounds[var.name][0]
)
)
var.setub(
minIgnoreNone(
value(var) + radius, self.initial_decision_bounds[var.name][1]
)
)
[docs]
def updateSurrogateModel(self):
"""
The parameters needed for the surrogate model are the values of:
b(w_k) : basis_model_output
d(w_k) : truth_model_output
grad b(w_k) : grad_basis_model_output
grad d(w_k) : grad_truth_model_output
"""
b = self.data
for i, y in b.ef_outputs.items():
b.basis_model_output[i] = value(b.basis_expressions[y])
b.truth_model_output[i] = value(b.truth_models[y])
# Basis functions are Pyomo expressions (in theory)
gradBasis = differentiate(b.basis_expressions[y], wrt_list=b.ef_inputs[i])
# These, however, are external functions
gradTruth = differentiate(b.truth_models[y], wrt_list=b.ef_inputs[i])
for j, w in enumerate(b.ef_inputs[i]):
b.grad_basis_model_output[i, j] = gradBasis[j]
b.grad_truth_model_output[i, j] = gradTruth[j]
b.value_of_ef_inputs[i, j] = value(w)
[docs]
def getCurrentModelState(self):
"""
Return current state of all model variables.
This is necessary if we need to reject a step and move backwards.
"""
return list(value(v, exception=False) for v in self.data.all_variables)
[docs]
def calculateFeasibility(self):
"""
Feasibility measure (theta(x)) is:
|| y - d(w) ||_1
"""
b = self.data
return sum(
abs(value(y) - value(b.truth_models[y])) for i, y in b.ef_outputs.items()
)
[docs]
def calculateStepSizeInfNorm(self, original_values, new_values):
"""
Taking original and new values, calculate the step-size norm ||s_k||:
|| u - u_k ||_inf
We assume that the user has correctly scaled their variables.
"""
original_vals = []
new_vals = []
for var, val in original_values.items():
original_vals.append(val)
new_vals.append(new_values[var])
return max([abs(new - old) for new, old in zip(new_vals, original_vals)])
[docs]
def initializeProblem(self):
"""
Initializes appropriate constraints, values, etc. for TRF problem
Returns
-------
objective_value : Initial objective
feasibility : Initial feasibility measure
STEPS:
1. Create and solve PMP (eq. 3) and set equal to "x_0"
2. Evaluate d(w_0)
3. Evaluate initial feasibility measure (theta(x_0))
4. Create initial SM (difference btw. low + high fidelity models)
"""
self.replaceExternalFunctionsWithVariables()
self.initial_decision_bounds = {}
for var in self.decision_variables:
self.initial_decision_bounds[var.name] = [var.lb, var.ub]
self.createConstraints()
self.data.basis_constraint.activate()
objective_value, _, _ = self.solveModel()
self.data.basis_constraint.deactivate()
self.updateSurrogateModel()
feasibility = self.calculateFeasibility()
self.data.sm_constraint_basis.activate()
return objective_value, feasibility
[docs]
def solveModel(self):
"""
Call the specified solver to solve the problem.
Returns
-------
self.data.objs[0] : Current objective value
step_norm : Current step size inf norm
feasibility : Current feasibility measure
This also caches the previous values of the vars, just in case
we need to access them later if a step is rejected
"""
current_decision_values = self.getCurrentDecisionVariableValues()
self.data.previous_model_state = self.getCurrentModelState()
results = self.solver.solve(
self.model, keepfiles=self.config.keepfiles, tee=self.config.tee
)
if not check_optimal_termination(results):
raise ArithmeticError(
'EXIT: Model solve failed with status {} and termination'
' condition(s) {}.'.format(
str(results.solver.status),
str(results.solver.termination_condition),
)
)
self.model.solutions.load_from(results)
new_decision_values = self.getCurrentDecisionVariableValues()
step_norm = self.calculateStepSizeInfNorm(
current_decision_values, new_decision_values
)
feasibility = self.calculateFeasibility()
return self.data.objs[0](), step_norm, feasibility
[docs]
def rejectStep(self):
"""
If a step is rejected, we reset the model variables values back
to their cached state - which we set in solveModel
"""
for var, val in zip(self.data.all_variables, self.data.previous_model_state):
var.set_value(val, skip_validation=True)