# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________
from pyomo.common.collections import ComponentSet, ComponentMap
from pyomo.common.timing import HierarchicalTimer
from pyomo.common.dependencies import attempt_import, numpy as np
from pyomo.core.base.objective import Objective
from pyomo.core.base.suffix import Suffix
from pyomo.core.expr.visitor import identify_variables
from pyomo.util.calc_var_value import calculate_variable_from_constraint
from pyomo.util.subsystems import (
TemporarySubsystemManager,
create_subsystem_block,
generate_subsystem_blocks,
)
# Use attempt_import here due to unguarded NumPy import in these files
pyomo_nlp = attempt_import('pyomo.contrib.pynumero.interfaces.pyomo_nlp')[0]
nlp_proj = attempt_import('pyomo.contrib.pynumero.interfaces.nlp_projections')[0]
from pyomo.contrib.pynumero.algorithms.solvers.cyipopt_solver import CyIpoptSolver
from pyomo.contrib.pynumero.interfaces.cyipopt_interface import CyIpoptNLP
from pyomo.contrib.pynumero.algorithms.solvers.scipy_solvers import (
FsolveNlpSolver,
NewtonNlpSolver,
SecantNewtonNlpSolver,
)
from pyomo.contrib.incidence_analysis import IncidenceGraphInterface
from pyomo.contrib.incidence_analysis.scc_solver import (
generate_strongly_connected_components,
)
[docs]
class NlpSolverBase(object):
"""A base class that solves an NLP object
Subclasses should implement this interface for compatibility with
ImplicitFunctionSolver objects.
"""
[docs]
def __init__(self, nlp, options=None, timer=None):
raise NotImplementedError(
"%s has not implemented the __init__ method" % type(self)
)
def solve(self, **kwds):
raise NotImplementedError(
"%s has not implemented the solve method" % type(self)
)
[docs]
class CyIpoptSolverWrapper(NlpSolverBase):
"""A wrapper for CyIpoptNLP and CyIpoptSolver that implements the
NlpSolverBase API
"""
[docs]
def __init__(self, nlp, options=None, timer=None):
self._cyipopt_nlp = CyIpoptNLP(nlp)
self._cyipopt_solver = CyIpoptSolver(self._cyipopt_nlp, options=options)
def solve(self, **kwds):
return self._cyipopt_solver.solve(**kwds)
[docs]
class ScipySolverWrapper(NlpSolverBase):
"""A wrapper for SciPy NLP solvers that implements the NlpSolverBase API
This solver uses scipy.optimize.fsolve for "vector-valued" NLPs (with more
than one primal variable and equality constraint) and the Secant-Newton
hybrid for "scalar-valued" NLPs.
"""
[docs]
def __init__(self, nlp, timer=None, options=None):
if options is None:
options = {}
for key in options:
if (
key not in SecantNewtonNlpSolver.OPTIONS
and key not in FsolveNlpSolver.OPTIONS
):
raise ValueError(
"Option %s is invalid for both SecantNewtonNlpSolver and"
" FsolveNlpSolver" % key
)
# Note that options currently contain the options for both solvers.
# There is currently no way to specify, e.g., different tolerances
# for the two solvers. This can be updated if there is demand for it.
newton_options = {
key: value
for key, value in options.items()
if key in SecantNewtonNlpSolver.OPTIONS
}
fsolve_options = {
key: value
for key, value in options.items()
if key in FsolveNlpSolver.OPTIONS
}
if nlp.n_primals() == 1:
solver = SecantNewtonNlpSolver(nlp, timer=timer, options=newton_options)
else:
solver = FsolveNlpSolver(nlp, timer=timer, options=fsolve_options)
self._nlp = nlp
self._options = options
self._solver = solver
def solve(self, x0=None):
res = self._solver.solve(x0=x0)
return res
[docs]
class PyomoImplicitFunctionBase(object):
"""A base class defining an API for implicit functions defined using
Pyomo components. In particular, this is the API required by
ExternalPyomoModel.
Implicit functions are defined by two lists of Pyomo VarData and
one list of Pyomo ConstraintData. The first list of VarData corresponds
to "variables" defining the outputs of the implicit function.
The list of ConstraintData are equality constraints that are converged
to evaluate the implicit function. The second list of VarData are
variables to be treated as "parameters" or inputs to the implicit
function.
"""
[docs]
def __init__(self, variables, constraints, parameters):
"""
Arguments
---------
variables: List of VarData
Variables to be treated as outputs of the implicit function
constraints: List of ConstraintData
Constraints that are converged to evaluate the implicit function
parameters: List of VarData
Variables to be treated as inputs to the implicit function
"""
self._variables = variables
self._constraints = constraints
self._parameters = parameters
self._block_variables = variables + parameters
self._block = create_subsystem_block(constraints, self._block_variables)
def get_variables(self):
return self._variables
def get_constraints(self):
return self._constraints
def get_parameters(self):
return self._parameters
def get_block(self):
return self._block
[docs]
def set_parameters(self, values):
"""Sets the parameters of the system that defines the implicit
function.
This method does not necessarily need to update values of the Pyomo
variables, as long as the next evaluation of this implicit function
is consistent with these inputs.
Arguments
---------
values: NumPy array
Array of values to set for the "parameter variables" in the order
they were specified in the constructor
"""
raise NotImplementedError()
[docs]
def evaluate_outputs(self):
"""Returns the values of the variables that are treated as outputs
of the implicit function
The returned values do not necessarily need to be the values stored
in the Pyomo variables, as long as they are consistent with the
latest parameters that have been set.
Returns
-------
NumPy array
Array with values corresponding to the "output variables" in
the order they were specified in the constructor
"""
raise NotImplementedError()
[docs]
def update_pyomo_model(self):
"""Sets values of "parameter variables" and "output variables"
to the most recent values set or computed in this implicit function
"""
raise NotImplementedError()
[docs]
class ImplicitFunctionSolver(PyomoImplicitFunctionBase):
"""A basic implicit function solver that uses a ProjectedNLP to solve
the parameterized system without repeated file writes when parameters
are updated
"""
[docs]
def __init__(
self,
variables,
constraints,
parameters,
solver_class=None,
solver_options=None,
timer=None,
):
if timer is None:
timer = HierarchicalTimer()
self._timer = timer
if solver_options is None:
solver_options = {}
self._timer.start("__init__")
super().__init__(variables, constraints, parameters)
block = self.get_block()
# PyomoNLP requires an objective
block._obj = Objective(expr=0.0)
# CyIpoptSolver requires a non-empty scaling factor
block.scaling_factor = Suffix(direction=Suffix.EXPORT)
block.scaling_factor[block._obj] = 1.0
self._timer.start("PyomoNLP")
self._nlp = pyomo_nlp.PyomoNLP(block)
self._timer.stop("PyomoNLP")
primals_ordering = [var.name for var in variables]
self._proj_nlp = nlp_proj.ProjectedExtendedNLP(self._nlp, primals_ordering)
self._timer.start("NlpSolver")
if solver_class is None:
self._solver = ScipySolverWrapper(
self._proj_nlp, options=solver_options, timer=timer
)
else:
self._solver = solver_class(
self._proj_nlp, options=solver_options, timer=timer
)
self._timer.stop("NlpSolver")
vars_in_cons = []
_seen = set()
for con in constraints:
for var in identify_variables(con.expr, include_fixed=False):
if id(var) not in _seen:
_seen.add(id(var))
vars_in_cons.append(var)
self._active_var_set = ComponentSet(vars_in_cons)
# It is possible (and fairly common) for variables specified as
# parameters to not appear in any of the specified constraints.
# We will fail if we try to get their coordinates in the NLP.
#
# Technically, this could happen for the variables as well. However,
# this would guarantee that the Jacobian is singular. I will worry
# about this when I encounter such a case.
self._active_param_mask = np.array(
[(p in self._active_var_set) for p in parameters]
)
self._active_parameters = [
p for i, p in enumerate(parameters) if self._active_param_mask[i]
]
if any((var not in self._active_var_set) for var in variables):
raise RuntimeError(
"Invalid model. All variables must appear in specified constraints."
)
# These are coordinates in the original NLP
self._variable_coords = self._nlp.get_primal_indices(variables)
self._active_parameter_coords = self._nlp.get_primal_indices(
self._active_parameters
)
# NOTE: With this array, we are storing the same data in two locations.
# Once here, and once in the NLP. We do this because parameters do not
# *need* to exist in the NLP. However, we still need to be able to
# update the "parameter variables" in the Pyomo model. If we only stored
# the parameters in the NLP, we would lose the values for parameters
# that don't appear in the active constraints.
self._parameter_values = np.array([var.value for var in parameters])
self._timer.start("__init__")
[docs]
def set_parameters(self, values, **kwds):
self._timer.start("set_parameters")
# I am not 100% sure the values will always be an array (as opposed to
# list), so explicitly convert here.
values = np.array(values)
self._parameter_values = values
values = np.compress(self._active_param_mask, values)
primals = self._nlp.get_primals()
# Will it cause a problem that _active_parameter_coords is a list
# rather than array?
primals[self._active_parameter_coords] = values
self._nlp.set_primals(primals)
self._timer.start("solve")
results = self._solver.solve(**kwds)
self._timer.stop("solve")
self._timer.stop("set_parameters")
return results
[docs]
def evaluate_outputs(self):
primals = self._nlp.get_primals()
outputs = primals[self._variable_coords]
return outputs
[docs]
def update_pyomo_model(self):
primals = self._nlp.get_primals()
for i, var in enumerate(self.get_variables()):
var.set_value(primals[self._variable_coords[i]], skip_validation=True)
for var, value in zip(self._parameters, self._parameter_values):
var.set_value(value, skip_validation=True)
[docs]
class DecomposedImplicitFunctionBase(PyomoImplicitFunctionBase):
"""A base class for an implicit function that applies a partition
to its variables and constraints and converges the system by solving
subsets sequentially
Subclasses should implement the partition_system method, which
determines how variables and constraints are partitioned into subsets.
"""
[docs]
def __init__(
self,
variables,
constraints,
parameters,
solver_class=None,
solver_options=None,
timer=None,
use_calc_var=True,
):
if timer is None:
timer = HierarchicalTimer()
self._timer = timer
self._timer.start("__init__")
if solver_class is None:
solver_class = ScipySolverWrapper
self._solver_class = solver_class
if solver_options is None:
solver_options = {}
self._solver_options = solver_options
self._calc_var_cutoff = 1 if use_calc_var else 0
# NOTE: This super call is only necessary so the get_* methods work
super().__init__(variables, constraints, parameters)
subsystem_list = [
# Switch order in list for compatibility with generate_subsystem_blocks
(cons, vars)
for vars, cons in self.partition_system(variables, constraints)
]
var_param_set = ComponentSet(variables + parameters)
# We will treat variables that are neither variables nor parameters
# as "constants". These are usually things like area, volume, or some
# other global parameter that is treated as a variable and "fixed" with
# an equality constraint.
constants = []
constant_set = ComponentSet()
for con in constraints:
for var in identify_variables(con.expr, include_fixed=False):
if var not in constant_set and var not in var_param_set:
# If this is a newly encountered variable that is neither
# a var nor param, treat it as a "constant"
constant_set.add(var)
constants.append(var)
with TemporarySubsystemManager(to_fix=constants):
# Temporarily fix "constant" variables so (a) they don't show
# up in the local inputs of the subsystem blocks and (b) so
# they don't appear as additional columns in the NLPs and
# ProjectedNLPs.
self._subsystem_list = list(generate_subsystem_blocks(subsystem_list))
# These are subsystems that need an external solver, rather than
# calculate_variable_from_constraint. _calc_var_cutoff should be either
# 0 or 1.
self._solver_subsystem_list = [
(block, inputs)
for block, inputs in self._subsystem_list
if len(block.vars) > self._calc_var_cutoff
]
# Need a dummy objective to create an NLP
for block, inputs in self._solver_subsystem_list:
block._obj = Objective(expr=0.0)
# I need scaling_factor so Pyomo NLPs I create from these blocks
# don't break when ProjectedNLP calls get_primals_scaling
block.scaling_factor = Suffix(direction=Suffix.EXPORT)
# HACK: scaling_factor just needs to be nonempty
block.scaling_factor[block._obj] = 1.0
# Original PyomoNLP for each subset in the partition
# Since we are creating these NLPs with "constants" fixed, these
# variables will not show up in the NLPs
self._timer.start("PyomoNLP")
self._solver_subsystem_nlps = [
pyomo_nlp.PyomoNLP(block)
for block, inputs in self._solver_subsystem_list
]
self._timer.stop("PyomoNLP")
# "Output variable" names are required to construct ProjectedNLPs.
# Ideally, we can eventually replace these with variable indices.
self._solver_subsystem_var_names = [
[var.name for var in block.vars.values()]
for block, inputs in self._solver_subsystem_list
]
self._solver_proj_nlps = [
nlp_proj.ProjectedExtendedNLP(nlp, names)
for nlp, names in zip(
self._solver_subsystem_nlps, self._solver_subsystem_var_names
)
]
# We will solve the ProjectedNLPs rather than the original NLPs
self._timer.start("NlpSolver")
self._nlp_solvers = [
self._solver_class(nlp, timer=self._timer, options=self._solver_options)
for nlp in self._solver_proj_nlps
]
self._timer.stop("NlpSolver")
self._solver_subsystem_input_coords = [
# Coordinates in the NLP, not ProjectedNLP
nlp.get_primal_indices(inputs)
for nlp, (subsystem, inputs) in zip(
self._solver_subsystem_nlps, self._solver_subsystem_list
)
]
self._n_variables = len(variables)
self._n_constraints = len(constraints)
self._n_parameters = len(parameters)
# This is a global (wrt individual subsystems) array that stores
# the current values of variables and parameters. This is useful
# for updating values in between subsystem solves.
#
# NOTE: This could also be implemented as a tuple of
# (subsystem_coord, primal_coord), which would eliminate the need to
# store data in two locations. The current implementation is easier,
# however.
self._global_values = np.array([var.value for var in variables + parameters])
self._global_indices = ComponentMap(
(var, i) for i, var in enumerate(variables + parameters)
)
# Cache the global array-coordinates of each subset of "input"
# variables. These are used for updating before each solve.
self._local_input_global_coords = [
# If I do not fix "constants" above, I get errors here
# that only show up in the CLC models.
# TODO: Add a test that covers this edge case.
np.array([self._global_indices[var] for var in inputs], dtype=int)
for (_, inputs) in self._solver_subsystem_list
]
# Cache the global array-coordinates of each subset of "output"
# variables. These are used for updating after each solve.
self._output_coords = [
np.array(
[self._global_indices[var] for var in block.vars.values()], dtype=int
)
for (block, _) in self._solver_subsystem_list
]
self._timer.stop("__init__")
[docs]
def n_subsystems(self):
"""Returns the number of subsystems in the partition of variables
and equations used to converge the system defining the implicit
function
"""
return len(self._subsystem_list)
[docs]
def partition_system(self, variables, constraints):
"""Partitions the systems of equations defined by the provided
variables and constraints
Each subset of the partition should have an equal number of variables
and equations. These subsets, or "subsystems", will be solved
sequentially in the order provided by this method instead of solving
the entire system simultaneously. Subclasses should implement this
method to define the partition that their implicit function solver
will use. Partitions are defined as a list of tuples of lists.
Each tuple has two entries, the first a list of variables, and the
second a list of constraints. These inner lists should have the
same number of entries.
Arguments
---------
variables: list
List of VarData in the system to be partitioned
constraints: list
List of ConstraintData (equality constraints) defining the
equations of the system to be partitioned
Returns
-------
List of tuples
List of tuples describing the ordered partition. Each tuple
contains equal-length subsets of variables and constraints.
"""
# Subclasses should implement this method, which returns an ordered
# partition (two lists-of-lists) of variables and constraints.
raise NotImplementedError(
"%s has not implemented the partition_system method" % type(self)
)
[docs]
def set_parameters(self, values):
self._timer.start("set_parameters")
values = np.array(values)
#
# Set parameter values
#
# NOTE: Here I rely on the fact that the "global array" is in the
# order (variables, parameters)
self._global_values[self._n_variables :] = values
#
# Solve subsystems one-by-one
#
# The basic procedure is: update local information from the global
# array, solve the subsystem, then update the global array with
# new values.
solver_subsystem_idx = 0
for block, inputs in self._subsystem_list:
if len(block.vars) <= self._calc_var_cutoff:
# Update model values from global array.
for var in inputs:
idx = self._global_indices[var]
var.set_value(self._global_values[idx], skip_validation=True)
# Solve using calculate_variable_from_constraint
var = block.vars[0]
con = block.cons[0]
self._timer.start("solve")
self._timer.start("calc_var")
calculate_variable_from_constraint(var, con)
self._timer.stop("calc_var")
self._timer.stop("solve")
# Update global array with values from solve
self._global_values[self._global_indices[var]] = var.value
else:
# Transfer variable values into the projected NLP, solve,
# and extract values.
i = solver_subsystem_idx
nlp = self._solver_subsystem_nlps[i]
proj_nlp = self._solver_proj_nlps[i]
input_coords = self._solver_subsystem_input_coords[i]
input_global_coords = self._local_input_global_coords[i]
output_global_coords = self._output_coords[i]
nlp_solver = self._nlp_solvers[solver_subsystem_idx]
# Get primals, load potentially new input values into primals,
# then load primals into NLP
primals = nlp.get_primals()
primals[input_coords] = self._global_values[input_global_coords]
# Set primals in the original NLP. This is necessary so the
# parameters get updated.
nlp.set_primals(primals)
# Get initial guess in the space of variables we solve for
x0 = proj_nlp.get_primals()
self._timer.start("solve")
self._timer.start("solve_nlp")
nlp_solver.solve(x0=x0)
self._timer.stop("solve_nlp")
self._timer.stop("solve")
# Set values in global array. Here we rely on the fact that
# the projected NLP's primals are in the order that variables
# were initially specified.
self._global_values[output_global_coords] = proj_nlp.get_primals()
solver_subsystem_idx += 1
self._timer.stop("set_parameters")
[docs]
def evaluate_outputs(self):
return self._global_values[: self._n_variables]
[docs]
def update_pyomo_model(self):
# NOTE: Here we rely on the fact that global_values is in the
# order (variables, parameters)
for i, var in enumerate(self.get_variables() + self.get_parameters()):
var.set_value(self._global_values[i], skip_validation=True)
[docs]
class SccImplicitFunctionSolver(DecomposedImplicitFunctionBase):
[docs]
def partition_system(self, variables, constraints):
self._timer.start("partition")
igraph = IncidenceGraphInterface()
var_blocks, con_blocks = igraph.block_triangularize(variables, constraints)
self._timer.stop("partition")
return zip(var_blocks, con_blocks)