Source code for pyomo.contrib.sensitivity_toolbox.sens

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

# ______________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2024
#  National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License
# ______________________________________________________________________________
from pyomo.environ import (
    Param,
    Var,
    Block,
    ComponentMap,
    Objective,
    Constraint,
    ConstraintList,
    Suffix,
    value,
    ComponentUID,
)

from pyomo.common.sorting import sorted_robust
from pyomo.core.expr import ExpressionReplacementVisitor

from pyomo.common.modeling import unique_component_name
from pyomo.common.deprecation import deprecated
from pyomo.common.tempfiles import TempfileManager
from pyomo.opt import SolverFactory, SolverStatus
from pyomo.contrib.sensitivity_toolbox.k_aug import K_augInterface, InTempDir
import logging
import os
import io
import shutil
from pyomo.common.dependencies import numpy as np, numpy_available
from pyomo.common.dependencies import scipy, scipy_available

logger = logging.getLogger('pyomo.contrib.sensitivity_toolbox')


@deprecated(
    "The sipopt function has been deprecated. Use the sensitivity_calculation() "
    "function with method='sipopt' to access this functionality.",
    logger='pyomo.contrib.sensitivity_toolbox',
    version='6.1',
)
def sipopt(
    instance,
    paramSubList,
    perturbList,
    cloneModel=True,
    tee=False,
    keepfiles=False,
    streamSoln=False,
):
    m = sensitivity_calculation(
        'sipopt',
        instance,
        paramSubList,
        perturbList,
        cloneModel,
        tee,
        keepfiles,
        solver_options=None,
    )

    return m


@deprecated(
    "The kaug function has been deprecated. Use the sensitivity_calculation() "
    "function with method='k_aug' to access this functionality.",
    logger='pyomo.contrib.sensitivity_toolbox',
    version='6.1',
)
def kaug(
    instance,
    paramSubList,
    perturbList,
    cloneModel=True,
    tee=False,
    keepfiles=False,
    solver_options=None,
    streamSoln=False,
):
    m = sensitivity_calculation(
        'k_aug',
        instance,
        paramSubList,
        perturbList,
        cloneModel,
        tee,
        keepfiles,
        solver_options,
    )

    return m


_SIPOPT_SUFFIXES = {
    'sens_state_0': Suffix.EXPORT,
    # ^ Not sure what this suffix does -RBP
    'sens_state_1': Suffix.EXPORT,
    'sens_state_value_1': Suffix.EXPORT,
    'sens_init_constr': Suffix.EXPORT,
    'sens_sol_state_1': Suffix.IMPORT,
    'sens_sol_state_1_z_L': Suffix.IMPORT,
    'sens_sol_state_1_z_U': Suffix.IMPORT,
}

_K_AUG_SUFFIXES = {
    'ipopt_zL_out': Suffix.IMPORT,
    'ipopt_zU_out': Suffix.IMPORT,
    'ipopt_zL_in': Suffix.EXPORT,
    'ipopt_zU_in': Suffix.EXPORT,
    'dual': Suffix.IMPORT_EXPORT,
    'dcdp': Suffix.EXPORT,
    'DeltaP': Suffix.EXPORT,
}


def _add_sensitivity_suffixes(block):
    suffix_dict = {}
    suffix_dict.update(_SIPOPT_SUFFIXES)
    suffix_dict.update(_K_AUG_SUFFIXES)
    for name, direction in suffix_dict.items():
        if block.component(name) is None:
            # Only add suffix if it doesn't already exist.
            # If something of this name does already exist, just
            # assume it is the proper suffix and move on.
            block.add_component(name, Suffix(direction=direction))


class _NotAnIndex(object):
    pass


def _generate_component_items(components):
    if type(components) not in {list, tuple}:
        components = (components,)
    for comp in components:
        if comp.is_indexed():
            for idx in sorted_robust(comp):
                yield idx, comp[idx]
        else:
            yield _NotAnIndex, comp


[docs]def sensitivity_calculation( method, instance, paramList, perturbList, cloneModel=True, tee=False, keepfiles=False, solver_options=None, ): """This function accepts a Pyomo ConcreteModel, a list of parameters, and their corresponding perturbation list. The model is then augmented with dummy constraints required to call sipopt or k_aug to get an approximation of the perturbed solution. Parameters ---------- method: string 'sipopt' or 'k_aug' instance: Block pyomo block or model object paramSubList: list list of mutable parameters or fixed variables perturbList: list list of perturbed parameter values cloneModel: bool, optional indicator to clone the model. If set to False, the original model will be altered tee: bool, optional indicator to stream solver log keepfiles: bool, optional preserve solver interface files solver_options: dict, optional Provides options to the solver (also the name of an attribute) Returns ------- The model that was manipulated by the sensitivity interface """ sens = SensitivityInterface(instance, clone_model=cloneModel) sens.setup_sensitivity(paramList) m = sens.model_instance if method not in {"k_aug", "sipopt"}: raise ValueError("Only methods 'k_aug' and 'sipopt' are supported'") if method == 'k_aug': k_aug = SolverFactory('k_aug', solver_io='nl') dot_sens = SolverFactory('dot_sens', solver_io='nl') ipopt = SolverFactory('ipopt', solver_io='nl') k_aug_interface = K_augInterface(k_aug=k_aug, dot_sens=dot_sens) ipopt.solve(m, tee=tee) m.ipopt_zL_in.update(m.ipopt_zL_out) #: important! m.ipopt_zU_in.update(m.ipopt_zU_out) #: important! k_aug.options['dsdp_mode'] = "" #: sensitivity mode! k_aug_interface.k_aug(m, tee=tee) sens.perturb_parameters(perturbList) if method == 'sipopt': ipopt_sens = SolverFactory('ipopt_sens', solver_io='nl') ipopt_sens.options['run_sens'] = 'yes' if solver_options is not None: ipopt_sens.options['linear_solver'] = solver_options # Send the model to ipopt_sens and collect the solution results = ipopt_sens.solve(m, keepfiles=keepfiles, tee=tee) elif method == 'k_aug': dot_sens.options["dsdp_mode"] = "" k_aug_interface.dot_sens(m, tee=tee) return m
def get_dsdp(model, theta_names, theta, tee=False): """This function calculates gradient vector of the variables with respect to the parameters (theta_names). e.g) min f: p1*x1+ p2*(x2^2) + p1*p2 s.t c1: x1 + x2 = p1 c2: x2 + x3 = p2 0 <= x1, x2, x3 <= 10 p1 = 10 p2 = 5 the function returns dx/dp and dp/dp, and column orders. The following terms are used to define the output dimensions: Ncon = number of constraints Nvar = number of variables (Nx + Ntheta) Nx = number of decision (primal) variables Ntheta = number of uncertain parameters. Parameters ---------- model: Pyomo ConcreteModel model should include an objective function theta_names: list of strings List of Var names theta: dict Estimated parameters e.g) from parmest tee: bool, optional Indicates that ef solver output should be teed Returns ------- dsdp: scipy.sparse.csr.csr_matrix Ntheta by Nvar size sparse matrix. A Jacobian matrix of the (decision variables, parameters) with respect to parameters (theta_names). number of rows = len(theta_name), number of columns = len(col) col: list List of variable names """ # Get parameters from names. In SensitivityInterface, we expect # these to be parameters on the original model. param_list = [] for name in theta_names: comp = model.find_component(name) if comp is None: raise RuntimeError("Cannot find component %s on model" % name) if comp.ctype is Var: # If theta_names correspond to Vars in the model, these vars # need to be fixed. comp.fix() param_list.append(comp) sens = SensitivityInterface(model, clone_model=True) m = sens.model_instance # Setup model and calculate sensitivity matrix with k_aug sens.setup_sensitivity(param_list) k_aug = K_augInterface() k_aug.k_aug(m, tee=tee) # Write row and col files in a temp dir, then immediately # read into a Python data structure. nl_data = {} with InTempDir(): base_fname = "col_row" nl_file = ".".join((base_fname, "nl")) row_file = ".".join((base_fname, "row")) col_file = ".".join((base_fname, "col")) m.write(nl_file, io_options={"symbolic_solver_labels": True}) for fname in [nl_file, row_file, col_file]: with open(fname, "r") as fp: nl_data[fname] = fp.read() # Create more useful data structures from strings dsdp = np.fromstring(k_aug.data["dsdp_in_.in"], sep="\n\t") col = nl_data[col_file].strip("\n").split("\n") row = nl_data[row_file].strip("\n").split("\n") dsdp = dsdp.reshape((len(theta_names), int(len(dsdp) / len(theta_names)))) dsdp = dsdp[: len(theta_names), : len(col)] col = [i for i in col if sens.get_default_block_name() not in i] dsdp_out = np.zeros((len(theta_names), len(col))) for i in range(len(theta_names)): for j in range(len(col)): if sens.get_default_block_name() not in col[j]: dsdp_out[i, j] = -dsdp[i, j] # e.g) k_aug dsdp returns -dx1/dx1 = -1.0 return scipy.sparse.csr_matrix(dsdp_out), col def get_dfds_dcds(model, theta_names, tee=False, solver_options=None): """This function calculates gradient vector of the objective function and constraints with respect to the variables and parameters. e.g) min f: p1*x1+ p2*(x2^2) + p1*p2 s.t c1: x1 + x2 = p1 c2: x2 + x3 = p2 0 <= x1, x2, x3 <= 10 p1 = 10 p2 = 5 - Variables = (x1, x2, x3, p1, p2) - Fix p1 and p2 with estimated values The following terms are used to define the output dimensions: Ncon = number of constraints Nvar = number of variables (Nx + Ntheta) Nx = number of decision (primal) variables Ntheta = number of uncertain parameters. Parameters ---------- model: Pyomo ConcreteModel model should include an objective function theta_names: list of strings List of Var names tee: bool, optional Indicates that ef solver output should be teed solver_options: dict, optional Provides options to the solver (also the name of an attribute) Returns ------- gradient_f: numpy.ndarray Length Nvar array. A gradient vector of the objective function with respect to the (decision variables, parameters) at the optimal solution gradient_c: scipy.sparse.csr.csr_matrix Ncon by Nvar size sparse matrix. A Jacobian matrix of the constraints with respect to the (decision variables, parameters) at the optimal solution. Each row contains [row number, column number, and value], column order follows variable order in col and index starts from 0. Note that it follows k_aug. If no constraint exists, return [] col: list Size Nvar list of variable names row: list Size Ncon+1 list of constraints and objective function names. The final element is the objective function name. line_dic: dict column numbers of the theta_names in the model. Index starts from 1 Raises ------ RuntimeError When ipopt or k_aug or dotsens is not available Exception When ipopt fails """ # Create the solver plugin using the ASL interface ipopt = SolverFactory('ipopt', solver_io='nl') if solver_options is not None: ipopt.options = solver_options k_aug = SolverFactory('k_aug', solver_io='nl') if not ipopt.available(False): raise RuntimeError('ipopt is not available') if not k_aug.available(False): raise RuntimeError('k_aug is not available') # Declare Suffixes _add_sensitivity_suffixes(model) # K_AUG SUFFIXES model.dof_v = Suffix(direction=Suffix.EXPORT) #: SUFFIX FOR K_AUG model.rh_name = Suffix(direction=Suffix.IMPORT) #: SUFFIX FOR K_AUG AS WELL k_aug.options["print_kkt"] = "" results = ipopt.solve(model, tee=tee) # Raise exception if ipopt fails if results.solver.status == SolverStatus.warning: raise Exception(results.solver.Message) for o in model.component_objects(Objective, active=True): f_mean = value(o) model.ipopt_zL_in.update(model.ipopt_zL_out) model.ipopt_zU_in.update(model.ipopt_zU_out) # run k_aug k_aug_interface = K_augInterface(k_aug=k_aug) k_aug_interface.k_aug(model, tee=tee) #: always call k_aug AFTER ipopt. nl_data = {} with InTempDir(): base_fname = "col_row" nl_file = ".".join((base_fname, "nl")) row_file = ".".join((base_fname, "row")) col_file = ".".join((base_fname, "col")) model.write(nl_file, io_options={"symbolic_solver_labels": True}) for fname in [nl_file, row_file, col_file]: with open(fname, "r") as fp: nl_data[fname] = fp.read() col = nl_data[col_file].strip("\n").split("\n") row = nl_data[row_file].strip("\n").split("\n") # get the column numbers of "parameters" line_dic = {name: col.index(name) for name in theta_names} grad_f_file = os.path.join("GJH", "gradient_f_print.txt") grad_f_string = k_aug_interface.data[grad_f_file] gradient_f = np.fromstring(grad_f_string, sep="\n\t") col = [i for i in col if SensitivityInterface.get_default_block_name() not in i] grad_c_file = os.path.join("GJH", "A_print.txt") grad_c_string = k_aug_interface.data[grad_c_file] gradient_c = np.fromstring(grad_c_string, sep="\n\t") # Jacobian file is in "COO format," i.e. an nnz-by-3 array. # Reshape to a numpy array that matches this format. gradient_c = gradient_c.reshape((-1, 3)) num_constraints = len(row) - 1 # Objective is included as a row if num_constraints > 0: row_idx = gradient_c[:, 1] - 1 col_idx = gradient_c[:, 0] - 1 data = gradient_c[:, 2] gradient_c = scipy.sparse.csr_matrix( (data, (row_idx, col_idx)), shape=(num_constraints, len(col)) ) else: gradient_c = np.array([]) return gradient_f, gradient_c, col, row, line_dic def line_num(file_name, target): """This function returns the line number that contains 'target' in the file_name. This function identifies constraints that have variables in theta_names. Parameters ---------- file_name: string file includes the variable order (i.e. col file) target: string variable name to check Returns ------- count: int line number of target in the file Raises ------ Exception When file does not include target """ with open(file_name) as f: count = int(1) for line in f: if line.strip() == target: return int(count) count += 1 raise Exception(file_name + " does not include " + target) class SensitivityInterface(object): def __init__(self, instance, clone_model=True): """Constructor clones model if necessary and attaches to this object. """ self._original_model = instance if clone_model: # Note that we are not "cloning" the user's parameters # or perturbations. self.model_instance = instance.clone() else: self.model_instance = instance @classmethod def get_default_block_name(self): return '_SENSITIVITY_TOOLBOX_DATA' @staticmethod def get_default_var_name(name): # return '_'.join(('sens_var', name)) return name @staticmethod def get_default_param_name(name): # return '_'.join(('sens_param', name)) return name def _process_param_list(self, paramList): # We need to translate the components in paramList into # components in our possibly cloned model. orig = self._original_model instance = self.model_instance if orig is not instance: paramList = list( ComponentUID(param, context=orig).find_component_on(instance) for param in paramList ) return paramList def _add_data_block(self, existing_block=None): # If a sensitivity block already exists, and we have not done # any expression replacement, we delete the old block, re-fix the # sensitivity variables, and start again. # # Don't do this in the constructor as we could want to call # the constructor once, then perform multiple sensitivity # calculations with the same model instance. if existing_block is not None: if ( hasattr(existing_block, '_has_replaced_expressions') and not existing_block._has_replaced_expressions ): for var, _, _, _ in existing_block._sens_data_list: # Re-fix variables that the previous block was # treating as parameters. var.fix() self.model_instance.del_component(existing_block) else: msg = ( "Re-using sensitivity interface is not supported " "when calculating sensitivity for mutable parameters. " "Used fixed vars instead if you want to do this." ) raise RuntimeError(msg) # Add a block to keep track of model components necessary for this # sensitivity calculation. block = Block() self.model_instance.add_component(self.get_default_block_name(), block) self.block = block # If the user tells us they will perturb a set of parameters, we will # need to replace these parameters in the user's model's constraints. # This affects what we can do with the model later, so we add a flag. block._has_replaced_expressions = False # This is the main data structure for keeping track of "sensitivity # vars" and their associated params. It will be a list of tuples of # (vardata, paramdata, list_index, comp_index) # where: # vardata is the "sensitivity variable" data object, # paramdata is the associated parameter, # list_index is its index in the user-provided list, and # comp_index is its index in the component provided by the user. block._sens_data_list = [] # This will hold the user-provided list of # variables and/or parameters to perturb block._paramList = None # This will hold any constraints where we have replaced # parameters with variables. block.constList = ConstraintList() return block def _add_sensitivity_data(self, param_list): block = self.block sens_data_list = block._sens_data_list for i, comp in enumerate(param_list): if comp.ctype is Param: parent = comp.parent_component() if not parent.mutable: raise ValueError( "Parameters within paramList must be mutable. " "Got %s, which is not mutable." % comp.name ) # Add a Var: if comp.is_indexed(): d = {k: value(comp[k]) for k in comp.index_set()} var = Var(comp.index_set(), initialize=d) else: d = value(comp) var = Var(initialize=d) name = self.get_default_var_name(parent.local_name) name = unique_component_name(block, name) block.add_component(name, var) if comp.is_indexed(): sens_data_list.extend( (var[idx], param, i, idx) for idx, param in _generate_component_items(comp) ) else: sens_data_list.append((var, comp, i, _NotAnIndex)) elif comp.ctype is Var: parent = comp.parent_component() for _, data in _generate_component_items(comp): if not data.fixed: raise ValueError( "Specified \"parameter\" variables must be " "fixed. Got %s, which is not fixed." % comp.name ) # Add a Param: if comp.is_indexed(): d = {k: value(comp[k]) for k in comp.index_set()} param = Param(comp.index_set(), mutable=True, initialize=d) else: d = value(comp) param = Param(mutable=True, initialize=d) name = self.get_default_param_name(parent.local_name) name = unique_component_name(block, name) block.add_component(name, param) if comp.is_indexed(): sens_data_list.extend( (var, param[idx], i, idx) for idx, var in _generate_component_items(comp) ) else: sens_data_list.append((comp, param, i, _NotAnIndex)) def _replace_parameters_in_constraints(self, variableSubMap): instance = self.model_instance block = self.block # Visitor that we will use to replace user-provided parameters # in the objective and the constraints. param_replacer = ExpressionReplacementVisitor( substitute=variableSubMap, remove_named_expressions=True ) # TODO: Flag to ExpressionReplacementVisitor to only replace # named expressions if a node has been replaced within that # expression. new_old_comp_map = ComponentMap() # clone Objective, add to Block, and update any Expressions for obj in list( instance.component_data_objects(Objective, active=True, descend_into=True) ): tempName = unique_component_name(block, obj.local_name) new_expr = param_replacer.walk_expression(obj.expr) block.add_component(tempName, Objective(expr=new_expr)) new_old_comp_map[block.component(tempName)] = obj obj.deactivate() # clone Constraints, add to Block, and update any Expressions # # Unfortunate that this deactivates and replaces constraints # even if they don't contain the parameters. # old_con_list = list( instance.component_data_objects(Constraint, active=True, descend_into=True) ) last_idx = 0 for con in old_con_list: if con.equality or con.lower is None or con.upper is None: new_expr = param_replacer.walk_expression(con.expr) block.constList.add(expr=new_expr) last_idx += 1 new_old_comp_map[block.constList[last_idx]] = con else: # Constraint must be a ranged inequality, break into # separate constraints new_body = param_replacer.walk_expression(con.body) new_lower = param_replacer.walk_expression(con.lower) new_upper = param_replacer.walk_expression(con.upper) # Add constraint for lower bound block.constList.add(expr=(new_lower <= new_body)) last_idx += 1 new_old_comp_map[block.constList[last_idx]] = con # Add constraint for upper bound block.constList.add(expr=(new_body <= new_upper)) last_idx += 1 new_old_comp_map[block.constList[last_idx]] = con con.deactivate() return new_old_comp_map def setup_sensitivity(self, paramList): instance = self.model_instance paramList = self._process_param_list(paramList) existing_block = instance.component(self.get_default_block_name()) block = self._add_data_block(existing_block=existing_block) block._sens_data_list = [] block._paramList = paramList self._add_sensitivity_data(paramList) sens_data_list = block._sens_data_list for var, _, _, _ in sens_data_list: # This unfixes all variables, not just those the user added. var.unfix() # Map used to replace user-provided parameters. variableSubMap = dict( (id(param), var) for var, param, list_idx, _ in sens_data_list if paramList[list_idx].ctype is Param ) if variableSubMap: # We now replace the provided parameters in the user's # expressions. Only do this if we have to, i.e. the # user provided some parameters rather than all vars. block._replaced_map = self._replace_parameters_in_constraints( variableSubMap ) # Assume that we just replaced some params block._has_replaced_expressions = True block.paramConst = ConstraintList() for var, param, _, _ in sens_data_list: # block.paramConst.add(param - var == 0) block.paramConst.add(var - param == 0) # Declare Suffixes _add_sensitivity_suffixes(instance) for i, (var, _, _, _) in enumerate(sens_data_list): idx = i + 1 con = block.paramConst[idx] # sipopt instance.sens_state_0[var] = idx instance.sens_state_1[var] = idx instance.sens_init_constr[con] = idx # k_aug instance.dcdp[con] = idx def perturb_parameters(self, perturbList): # Note that entries of perturbList need not be components # of the cloned model. All we need are the values. instance = self.model_instance sens_data_list = self.block._sens_data_list paramConst = self.block.paramConst if len(self.block._paramList) != len(perturbList): raise ValueError( "Length of paramList argument does not equal length of perturbList" ) for i, (var, param, list_idx, comp_idx) in enumerate(sens_data_list): con = paramConst[i + 1] if comp_idx is _NotAnIndex: ptb = value(perturbList[list_idx]) else: try: ptb = value(perturbList[list_idx][comp_idx]) except TypeError: # If the user provided a scalar value to perturb # an indexed component. ptb = value(perturbList[list_idx]) # sipopt instance.sens_state_value_1[var] = ptb # k_aug # instance.DeltaP[con] = value(ptb - var) instance.DeltaP[con] = value(var - ptb) # FIXME: ^ This is incorrect. DeltaP should be (ptb - current). # But at least one test doesn't pass unless I use (current - ptb).