Source code for pyomo.contrib.pynumero.sparse.mpi_block_vector

#  ___________________________________________________________________________
#
#  Pyomo: Python Optimization Modeling Objects
#  Copyright (c) 2008-2025
#  National Technology and Engineering Solutions of Sandia, LLC
#  Under the terms of Contract DE-NA0003525 with National Technology and
#  Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
#  rights in this software.
#  This software is distributed under the 3-clause BSD License.
#  ___________________________________________________________________________

from pyomo.common.dependencies import mpi4py
from pyomo.contrib.pynumero.sparse import BlockVector
from .base_block import (
    BaseBlockVector,
    vec_unary_ufuncs,
    vec_binary_ufuncs,
    vec_associative_reductions,
)
from .block_vector import NotFullyDefinedBlockVectorError
from .block_vector import assert_block_structure as block_vector_assert_block_structure
import numpy as np
import operator


[docs] def assert_block_structure(vec): if vec.has_none: msg = 'Call MPIBlockVector.broadcast_block_sizes() first.' raise NotFullyDefinedBlockVectorError(msg)
[docs] class MPIBlockVector(BaseBlockVector, np.ndarray): """ Parallel structured vector interface. This interface can be used to perform parallel operations on vectors composed by vectors. The main idea is to allocate vectors in different processors and make the corresponding parallel calls when necessary. Attributes ---------- _rank_owner: numpy.ndarray 1D-array with processor ownership of each block. A block can be own by a single processor or by all processors. Blocks own by all processors have ownership -1. Blocks own by a single processor have ownership rank. where rank=MPI.COMM_WORLD.Get_rank() _mpiw: MPI communicator A communicator from the MPI space. Typically MPI.COMM_WORLD _block_vector: BlockVector Internal BlockVector. Blocks that belong to this processor are stored in _block_vector. Blocks that do not belong to this processor are empty and store as numpy.zeros(0) _owned_mask: numpy.ndarray bool 1D-array that indicates if a block belongs to this processor. While _rank_owner tells which processor(s) owns each block, _owned_mask tells if a block is owned by this processor. Blocks that are owned by everyone (i.e. ownership = -1) are True in _owned_mask _owned_blocks: numpy.ndarray 1D-array with block indices owned by this processor. This includes blocks with ownership -1. _unique_owned_blocks: numpy.ndarray 1D-array with block indices owned only by this processor. This does not include blocks with ownership -1. _brow_lengths: numpy.ndarray 1D-Array of size nblocks that specifies the length of each entry in the MPIBlockVector. This is the same across all processors. _undefined_brows: set A set of block indices for which the blocks are still None (i.e., the dimensions have not yet ben set). Operations with BlockVectors require all entries to be different than None. Notes ------ This is the parallel implementation of pyomo.contrib.pynumero.sparse.BlockVector Parameters ------------------- nblocks: int number of blocks contained in the block vector rank_owner: array_like Array_like of size nblocks. Each entry defines ownership of each block. There are two types of ownership. Block that are owned by all processor, and blocks owned by a single processor. If a block is owned by all processors then its ownership is -1. Otherwise, if a block is owned by a single processor, then its ownership is equal to the rank of the processor. mpi_com: MPI communicator An MPI communicator. Tyically MPI.COMM_WORLD """ def __new__(cls, nblocks, rank_owner, mpi_comm, assert_correct_owners=False): assert isinstance(nblocks, int) assert len(rank_owner) == nblocks blocks = [None for i in range(nblocks)] arr = np.asarray(blocks, dtype='object') obj = arr.view(cls) obj._rank_owner = np.array([i for i in rank_owner]) obj._mpiw = mpi_comm obj._block_vector = BlockVector(nblocks) rank = obj._mpiw.Get_rank() comm_size = obj._mpiw.Get_size() assert np.all(obj._rank_owner < comm_size) # Determine which blocks are owned by this processor obj._owned_mask = np.bitwise_or(obj._rank_owner == rank, obj._rank_owner < 0) unique_owned_mask = obj._rank_owner == rank obj._unique_owned_blocks = unique_owned_mask.nonzero()[0] obj._owned_blocks = obj._owned_mask.nonzero()[0] # containers that facilitate looping obj._brow_lengths = np.empty(nblocks, dtype=np.float64) obj._brow_lengths.fill(np.nan) obj._undefined_brows = set(obj._owned_blocks) # make some pointers unmutable. These arrays don't change after # MPIBlockVector has been created obj._rank_owner.flags.writeable = False obj._owned_blocks.flags.writeable = False obj._owned_mask.flags.writeable = False obj._unique_owned_blocks.flags.writeable = False obj._broadcasted = False return obj
[docs] def __init__(self, nblocks, rank_owner, mpi_comm, assert_correct_owners=False): # Note: this requires communication but is disabled when assertions # are turned off if assert_correct_owners: assert ( self._assert_correct_owners() ), 'rank_owner must be the same in all processors'
def __array_prepare__(self, out_arr, context=None): return super(MPIBlockVector, self).__array_prepare__(self, out_arr, context) def __array_wrap__(self, out_arr, context=None): return super(MPIBlockVector, self).__array_wrap__(self, out_arr, context) def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): """Runs ufuncs speciallizations to MPIBlockVector""" outputs = kwargs.pop('out', None) if outputs is not None: raise NotImplementedError( str(ufunc) + ' cannot be used with MPIBlockVector if the out keyword argument is given.' ) if ufunc in vec_unary_ufuncs: results = self._unary_operation(ufunc, method, *inputs, **kwargs) return results elif ufunc in vec_binary_ufuncs: results = self._binary_operation(ufunc, method, *inputs, **kwargs) return results else: raise NotImplementedError(str(ufunc) + "not supported for MPIBlockVector") def _unary_operation(self, ufunc, method, *args, **kwargs): """Run recursion to perform unary_funcs on MPIBlockVector""" # ToDo: deal with out x = args[0] if isinstance(x, MPIBlockVector): rank = self._mpiw.Get_rank() v = x.copy_structure() for i in self._owned_blocks: _args = [x.get_block(i)] + [args[j] for j in range(1, len(args))] v.set_block(i, self._unary_operation(ufunc, method, *_args, **kwargs)) return v elif isinstance(x, BlockVector): v = BlockVector(x.nblocks) for i in range(x.nblocks): _args = [x.get_block(i)] + [args[j] for j in range(1, len(args))] v.set_block(i, self._unary_operation(ufunc, method, *_args, **kwargs)) return v elif type(x) == np.ndarray: return super(MPIBlockVector, self).__array_ufunc__( ufunc, method, *args, **kwargs ) else: raise NotImplementedError() def _binary_operation(self, ufunc, method, *args, **kwargs): """Run recursion to perform binary_funcs on MPIBlockVector""" # ToDo: deal with out x1 = args[0] x2 = args[1] if isinstance(x1, MPIBlockVector) and isinstance(x2, MPIBlockVector): msg = 'BlockVectors must be distributed in same processors' assert ( np.array_equal(x1._rank_owner, x2._rank_owner) or self._mpiw.Get_size() == 1 ), msg assert x1._mpiw == x2._mpiw, 'Need to have same communicator' res = x1.copy_structure() for i in x1._owned_blocks: _args = ( [x1.get_block(i)] + [x2.get_block(i)] + [args[j] for j in range(2, len(args))] ) res.set_block( i, self._binary_operation(ufunc, method, *_args, **kwargs) ) return res elif isinstance(x1, BlockVector) and isinstance(x2, MPIBlockVector): raise RuntimeError('Operation not supported by MPIBlockVector') elif isinstance(x1, MPIBlockVector) and isinstance(x2, BlockVector): raise RuntimeError('Operation not supported by MPIBlockVector') elif isinstance(x1, MPIBlockVector) and np.isscalar(x2): res = x1.copy_structure() for i in x1._owned_blocks: _args = ( [x1.get_block(i)] + [x2] + [args[j] for j in range(2, len(args))] ) res.set_block( i, self._binary_operation(ufunc, method, *_args, **kwargs) ) return res elif isinstance(x2, MPIBlockVector) and np.isscalar(x1): res = x2.copy_structure() for i in x2._owned_blocks: _args = ( [x1] + [x2.get_block(i)] + [args[j] for j in range(2, len(args))] ) res.set_block( i, self._binary_operation(ufunc, method, *_args, **kwargs) ) return res elif isinstance(x1, MPIBlockVector) and type(x2) == np.ndarray: raise RuntimeError('Operation not supported by MPIBlockVector') elif isinstance(x2, MPIBlockVector) and type(x1) == np.ndarray: raise RuntimeError('Operation not supported by MPIBlockVector') elif isinstance(x1, np.ndarray) and isinstance(x2, np.ndarray): # this will take care of blockvector and ndarrays return self._block_vector.__array_ufunc__(ufunc, method, *args, **kwargs) elif (type(x1) == BlockVector or np.isscalar(x1)) and ( type(x2) == BlockVector or np.isscalar(x2) ): return self._block_vector.__array_ufunc__(ufunc, method, *args, **kwargs) elif (type(x1) == np.ndarray or np.isscalar(x1)) and ( type(x2) == np.ndarray or np.isscalar(x2) ): return super(MPIBlockVector, self).__array_ufunc__( ufunc, method, *args, **kwargs ) else: raise NotImplementedError() @property def nblocks(self): """ Returns the number of blocks. """ return self._block_vector.nblocks @property def bshape(self): """ Returns the number of blocks in this MPIBlockVector in a tuple. """ return (self.nblocks,) @property def shape(self): """ Returns total number of elements in the MPIBlockVector """ return (self.size,) @property def size(self): """ Returns total number of elements in this MPIBlockVector """ assert_block_structure(self) comm = self._mpiw rank = comm.Get_rank() if rank == 0: indices = self._owned_blocks else: indices = self._unique_owned_blocks local_size = np.sum(self._brow_lengths[indices]) size = comm.allreduce(local_size) assert int(size) == size return int(size) @property def ndim(self): """ Returns dimension of this MPIBlockVector """ return 1 @property def has_none(self): """ Returns True if block vector has none entry """ return len(self._undefined_brows) != 0 @property def owned_blocks(self): """ Returns list with indices of blocks owned by this processor. """ return self._owned_blocks @property def shared_blocks(self): """ Returns list with indices of blocks shared by all processors """ return np.array([i for i in range(self.nblocks) if self._rank_owner[i] < 0]) @property def rank_ownership(self): """ Returns 1D-Array with processor ranks that own each block. The ownership of blocks that are owned by all processors is -1. """ return self._rank_owner @property def ownership_mask(self): """ Returns boolean 1D-Array that indicates which blocks are owned by this processor """ return self._owned_mask @property def mpi_comm(self): """Returns MPI communicator""" return self._mpiw def is_broadcasted(self): return self._broadcasted
[docs] def block_sizes(self, copy=True): """ Returns 1D-Array with sizes of individual blocks in this MPIBlockVector """ if not self._broadcasted: self.broadcast_block_sizes() if copy: return self._brow_lengths.copy() return self._brow_lengths
def get_block_size(self, ndx): res = self._brow_lengths[ndx] if np.isnan(res): raise NotFullyDefinedBlockVectorError( 'The dimensions of the requested block are not defined.' ) res = int(res) return res def _set_block_size(self, ndx, size): if ndx in self._undefined_brows: self._undefined_brows.remove(ndx) self._brow_lengths[ndx] = size else: if self._brow_lengths[ndx] != size: raise ValueError( 'Incompatible dimensions for block {ndx}; ' 'got {got}; expected {exp}'.format( ndx=ndx, got=size, exp=self._brow_lengths[ndx] ) ) # Note: this operation requires communication
[docs] def broadcast_block_sizes(self): """ Send sizes of all blocks to all processors. After this method is called this MPIBlockVector knows it's dimensions across all blocks. This method must be called before running any operations with the MPIBlockVector. """ assert_block_structure(self) rank = self._mpiw.Get_rank() num_processors = self._mpiw.Get_size() local_length_data = np.empty(self.nblocks, dtype=np.int64) local_length_data.fill(-1) for ndx in self.owned_blocks: if ndx in self._undefined_brows: raise NotFullyDefinedBlockVectorError( 'Block {ndx} is owned by rank {rank}, ' 'but the dimensions for block {ndx} ' 'have not yet been specified in rank {rank}. ' 'Please specify all owned blocks.'.format(ndx=ndx, rank=rank) ) local_length_data[ndx] = self.get_block_size(ndx) receive_data = np.empty(num_processors * self.nblocks, dtype=np.int64) self._mpiw.Allgather(local_length_data, receive_data) proc_dims = np.split(receive_data, num_processors) for i in range(self.nblocks): block_length = set() for k in range(num_processors): processor_sizes = proc_dims[k] block_length.add(processor_sizes[i]) if len(block_length) > 2: msg = 'Block {} has more than one dimension across processors'.format(i) raise RuntimeError(msg) elif len(block_length) == 2: if -1 not in block_length: msg = ( 'Block {} has more than one dimension across processors'.format( i ) ) raise RuntimeError(msg) block_length.remove(-1) elif -1 in block_length: msg = ( 'The dimension of block {} was not specified in any process'.format( i ) ) # here block_length must only have one element self._brow_lengths[i] = block_length.pop() self._brow_lengths = np.asarray(self._brow_lengths, dtype=np.int64) self._broadcasted = True
[docs] def finalize_block_sizes(self, broadcast=True, block_sizes=None): """ Only set broadcast=False if you know what you are doing! Parameters ---------- broadcast: bool block_sizes: None or np.ndarray """ if broadcast: self.broadcast_block_sizes() else: self._undefined_brows = set() self._brow_lengths = block_sizes self._broadcasted = True
# Note: this requires communication but is only run in __new__ def _assert_correct_owners(self, root=0): rank = self._mpiw.Get_rank() num_processors = self._mpiw.Get_size() if num_processors == 1: return True local_owners = self._rank_owner.copy() receive_data = None if rank == root: receive_data = np.empty(self.nblocks * num_processors, dtype=np.int64) self._mpiw.Gather(local_owners, receive_data, root=root) if rank == root: owners_in_processor = np.split(receive_data, num_processors) root_rank_owners = owners_in_processor[root] for i in range(self.nblocks): for k in range(num_processors): if k != root: if owners_in_processor[k][i] != root_rank_owners[i]: return False return True
[docs] def all(self, axis=None, out=None, keepdims=False): """ Returns True if all elements evaluate to True. """ assert out is None, 'Out keyword not supported' assert_block_structure(self) local = 1 for i in self._owned_blocks: local *= self._block_vector.get_block(i).all() return bool(self._mpiw.allreduce(local, op=mpi4py.MPI.PROD))
[docs] def any(self, axis=None, out=None, keepdims=False): """ Returns True if all elements evaluate to True. """ assert out is None, 'Out keyword not supported' assert_block_structure(self) local = 0 for i in self._owned_blocks: local += self._block_vector.get_block(i).any() return bool(self._mpiw.allreduce(local, op=mpi4py.MPI.SUM))
[docs] def min(self, axis=None, out=None, keepdims=False): """ Returns the smallest value stored in the vector """ assert out is None, 'Out keyword not supported' assert_block_structure(self) local_min = np.inf for i in self._owned_blocks: block = self._block_vector.get_block(i) if block.size > 0: lmin = block.min() if lmin <= local_min: local_min = lmin res = self._mpiw.allreduce(local_min, op=mpi4py.MPI.MIN) if res == np.inf: if self.size == 0: raise ValueError('cannot get the min of a size 0 array') return res
[docs] def max(self, axis=None, out=None, keepdims=False): """ Returns the largest value stored in this MPIBlockVector """ assert out is None, 'Out keyword not supported' assert_block_structure(self) local_max = -np.inf for i in self._owned_blocks: block = self._block_vector.get_block(i) if block.size > 0: lmax = block.max() if lmax >= local_max: local_max = lmax res = self._mpiw.allreduce(local_max, op=mpi4py.MPI.MAX) if res == -np.inf: if self.size == 0: raise ValueError('cannot get the max of a size 0 array') return res
[docs] def sum(self, axis=None, dtype=None, out=None, keepdims=False): """ Returns the sum of all entries in this MPIBlockVector """ assert out is None, 'Out keyword not supported' assert_block_structure(self) rank = self._mpiw.Get_rank() indices = self._unique_owned_blocks if rank != 0 else self._owned_blocks local_sum = 0.0 for i in indices: local_sum += self._block_vector.get_block(i).sum(axis=axis, dtype=dtype) return self._mpiw.allreduce(local_sum, op=mpi4py.MPI.SUM)
[docs] def prod(self, axis=None, dtype=None, out=None, keepdims=False): """ Returns the product of all entries in this MPIBlockVector """ assert out is None, 'Out keyword not supported' assert_block_structure(self) rank = self._mpiw.Get_rank() indices = self._unique_owned_blocks if rank != 0 else self._owned_blocks local_prod = 1.0 for i in indices: local_prod *= self._block_vector.get_block(i).prod(axis=axis, dtype=dtype) return self._mpiw.allreduce(local_prod, op=mpi4py.MPI.PROD)
[docs] def mean(self, axis=None, dtype=None, out=None, keepdims=False): """ Returns the average of all entries in this MPIBlockVector """ return self.sum(out=out) / self.size
[docs] def conj(self): """ Complex-conjugate all elements. """ assert_block_structure(self) result = self.copy_structure() for i in self._owned_blocks: result.set_block(i, self.get_block(i).conj()) return result
[docs] def conjugate(self): """ Complex-conjugate all elements. """ return self.conj()
[docs] def nonzero(self): """ Returns the indices of the elements that are non-zero. """ result = MPIBlockVector( nblocks=self.nblocks, rank_owner=self.rank_ownership, mpi_comm=self.mpi_comm, assert_correct_owners=False, ) assert_block_structure(self) for i in self._owned_blocks: result.set_block(i, self._block_vector.get_block(i).nonzero()[0]) return (result,)
[docs] def round(self, decimals=0, out=None): """ Return MPIBlockVector with each element rounded to the given number of decimals """ assert out is None, 'Out keyword not supported' assert_block_structure(self) result = self.copy_structure() for i in self._owned_blocks: result.set_block( i, self._block_vector.get_block(i).round(decimals=decimals) ) return result
[docs] def clip(self, min=None, max=None, out=None): """ Return MPIBlockVector whose values are limited to [min, max]. One of max or min must be given. Parameters ---------- min: scalar_like, optional Minimum value. If None, clipping is not performed on lower interval edge. max: scalar_like, optional Maximum value. If None, clipping is not performed on upper interval edge. Returns ------- MPIBlockVector """ assert out is None, 'Out keyword not supported' assert_block_structure(self) result = self.copy_structure() for i in self._owned_blocks: result.set_block(i, self._block_vector.get_block(i).clip(min=min, max=max)) return result
[docs] def compress(self, condition, axis=None, out=None): """ Return selected slices of each subblock. Parameters ---------- condition: MPIBlockVector that selects which entries to return. Determines to select (evaluate True in condition) Returns ------- MPIBlockVector """ assert out is None, 'Out keyword not supported' assert_block_structure(self) result = MPIBlockVector( nblocks=self.nblocks, rank_owner=self.rank_ownership, mpi_comm=self.mpi_comm, assert_correct_owners=False, ) if isinstance(condition, MPIBlockVector): # Note: do not need to check same size? this is checked implicitly msg = 'BlockVectors must be distributed in same processors' assert np.array_equal(self._rank_owner, condition._rank_owner), msg assert self._mpiw == condition._mpiw, 'Need to have same communicator' for i in self._owned_blocks: result.set_block(i, self.get_block(i).compress(condition.get_block(i))) return result if isinstance(condition, BlockVector): raise RuntimeError('Operation not supported by MPIBlockVector') elif isinstance(condition, np.ndarray): raise RuntimeError('Operation not supported by MPIBlockVector') else: raise NotImplementedError('Operation not supported by MPIBlockVector')
[docs] def copyfrom(self, other): """ Copy entries of other into this MPIBlockVector Parameters ---------- other: MPIBlockVector or BlockVector Returns ------- None """ if isinstance(other, MPIBlockVector): assert_block_structure(other) msg = 'Number of blocks mismatch {} != {}'.format( self.nblocks, other.nblocks ) assert self.nblocks == other.nblocks, msg msg = 'BlockVectors must be distributed in same processors' assert np.array_equal(self._rank_owner, other.rank_ownership), msg assert self._mpiw == other._mpiw, 'Need to have same communicator' for i in self._owned_blocks: self.set_block(i, other.get_block(i).copy()) elif isinstance(other, BlockVector): block_vector_assert_block_structure(other) msg = 'Number of blocks mismatch {} != {}'.format( self.nblocks, other.nblocks ) assert self.nblocks == other.nblocks, msg for i in self._owned_blocks: self.set_block(i, other.get_block(i).copy()) elif isinstance(other, np.ndarray): assert_block_structure(self) if not self.is_broadcasted(): self.broadcast_block_sizes() assert self.shape == other.shape, 'Dimension mismatch {} != {}'.format( self.shape, other.shape ) offset = 0 for idx in range(self.nblocks): if self._owned_mask[idx]: subarray = other[offset : offset + self.get_block_size(idx)] if isinstance(self.get_block(idx), BlockVector): self.get_block(idx).copyfrom(subarray) else: np.copyto(self.get_block(idx), subarray) offset += self.get_block_size(idx) else: raise NotImplementedError('Operation not supported by MPIBlockVector')
[docs] def copyto(self, other): """ Copy entries of this MPIBlockVector into other Parameters ---------- other: MPIBlockVector or BlockVector Returns ------- None """ if isinstance(other, MPIBlockVector): other.copyfrom(self) else: raise NotImplementedError('Operation not supported by MPIBlockVector')
[docs] def set_blocks(self, blocks): """ Assigns vectors in blocks Parameters ---------- blocks: list list of vectors Returns ------- None """ raise NotImplementedError('Operation not supported by MPIBlockVector')
[docs] def clone(self, value=None, copy=True): """ Returns a copy of this MPIBlockVector Parameters ---------- value: scalar, optional all entries of the cloned vector are set to this value copy: bool, optional if set to true makes a deepcopy of each block in this vector. default False Returns ------- MPIBlockVector """ result = MPIBlockVector( self.nblocks, self.rank_ownership, self.mpi_comm, assert_correct_owners=False, ) result._block_vector = self._block_vector.clone(copy=copy) result._brow_lengths = self._brow_lengths.copy() result._undefined_brows = set(self._undefined_brows) if value is not None: result.fill(value) return result
[docs] def copy(self, order='C'): """ Returns a copy of the MPIBlockVector """ result = MPIBlockVector( self.nblocks, self.rank_ownership, self.mpi_comm, assert_correct_owners=False, ) result._block_vector = self._block_vector.copy(order=order) result._brow_lengths = self._brow_lengths.copy() result._undefined_brows = set(self._undefined_brows) return result
[docs] def copy_structure(self): """ Returns a copy of the MPIBlockVector structure filled with zeros """ result = MPIBlockVector( self.nblocks, self.rank_ownership, self.mpi_comm, assert_correct_owners=False, ) if self.is_broadcasted(): result.finalize_block_sizes( broadcast=False, block_sizes=self.block_sizes(copy=False) ) for bid in self.owned_blocks: block = self.get_block(bid) if block is not None: if isinstance(block, BlockVector): result.set_block(bid, block.copy_structure()) elif type(block) == np.ndarray: result.set_block(bid, np.zeros(block.size)) else: raise NotImplementedError('Should never get here') return result
[docs] def fill(self, value): """ Fills the MPIBLockVector with a scalar value. Parameters ---------- value : scalar All elements in the vector will be assigned this value Returns ------- None """ assert_block_structure(self) for idx in self.owned_blocks: self.get_block(idx).fill(value)
[docs] def dot(self, other, out=None): """ Returns dot product Parameters ---------- other : MPIBlockVector Returns ------- float """ assert_block_structure(self) assert out is None if isinstance(other, MPIBlockVector): assert_block_structure(other) msg = 'BlockVectors must be distributed in same processors' assert np.array_equal(self.rank_ownership, other.rank_ownership), msg assert self.mpi_comm == other.mpi_comm, 'Need to have same communicator' rank = self._mpiw.Get_rank() indices = self._unique_owned_blocks if rank != 0 else self._owned_blocks local_dot_prod = 0.0 for i in indices: local_dot_prod += self._block_vector.get_block(i).dot( other.get_block(i) ) return self._mpiw.allreduce(local_dot_prod, op=mpi4py.MPI.SUM) elif isinstance(other, BlockVector): assert ( self.nblocks == other.nblocks ), 'Number of blocks mismatch: {} != {}'.format(self.nblocks, other.nblocks) return self.dot( other.toMPIBlockVector( self.rank_ownership, self.mpi_comm, assert_correct_owners=False ) ) elif isinstance(other, np.ndarray): other_bv = self.copy_structure() other_bv.copyfrom(other) return self.dot(other_bv) else: raise NotImplementedError('Operation not supported by MPIBlockVector')
@staticmethod def _serialize_structure(block_vector): """ Parameters ---------- block_vector: BlockVector Returns ------- list """ serialized_structure = list() for ndx in range(block_vector.nblocks): blk = block_vector.get_block(ndx) if isinstance(blk, BlockVector): serialized_structure.append(-1) serialized_structure.append(blk.nblocks) serialized_structure.extend(MPIBlockVector._serialize_structure(blk)) elif isinstance(blk, MPIBlockVector): raise NotImplementedError( 'Operation not supported for MPIBlockVectors containing other MPIBlockVectors' ) elif isinstance(blk, np.ndarray): serialized_structure.append(-2) serialized_structure.append(blk.size) else: raise NotImplementedError('Unrecognized input.') return serialized_structure @staticmethod def _create_from_serialized_structure(serialized_structure, structure_ndx, result): """ Parameters ---------- serialized_structure: np.ndarray structure_ndx: int result: BlockVector Returns ------- structure_ndx: int """ for ndx in range(result.nblocks): if serialized_structure[structure_ndx] == -1: structure_ndx += 1 block = BlockVector(serialized_structure[structure_ndx]) structure_ndx += 1 structure_ndx = MPIBlockVector._create_from_serialized_structure( serialized_structure, structure_ndx, block ) result.set_block(ndx, block) elif serialized_structure[structure_ndx] == -2: structure_ndx += 1 result.set_block(ndx, np.zeros(serialized_structure[structure_ndx])) structure_ndx += 1 else: raise ValueError('Unrecognized structure') return structure_ndx
[docs] def make_local_structure_copy(self): """ Creates a BlockVector with the same structure as the MPIBlockVector Returns ------- BlockVector """ """ We do this by serializing the structure, then gathering it. To serialize the structure, we use an array. The first number indicates if the first block is a numpy array or a BlockVector. We use -1 to indicate a BlockVector and -2 to indicate a numpy array. If the block is a BlockVector, then the next number is a positive integer specifying the number of blocks in the block vector. If the block is a numpy array, then the next number is a positive integer specifying the size of the array. After the number of blocks in a BlockVector is specified, we follow the same procedure to specify the structure of that BlockVector. """ assert_block_structure(self) serialized_structure_by_block = dict() length_per_block = np.zeros(self.nblocks, dtype=np.int64) rank = self._mpiw.Get_rank() if rank == 0: block_indices = self._owned_blocks else: block_indices = self._unique_owned_blocks for ndx in block_indices: blk = self.get_block(ndx) blk_structure = list() if isinstance(blk, BlockVector): blk_structure.append(-1) blk_structure.append(blk.nblocks) blk_structure.extend(self._serialize_structure(blk)) elif isinstance(blk, MPIBlockVector): raise NotImplementedError( 'Operation not supported for MPIBlockVectors containing other MPIBlockVectors' ) elif isinstance(blk, np.ndarray): blk_structure.append(-2) blk_structure.append(blk.size) else: raise NotImplementedError('Unrecognized input.') length_per_block[ndx] = len(blk_structure) serialized_structure_by_block[ndx] = np.asarray( blk_structure, dtype=np.int64 ) global_length_per_block = np.zeros(self.nblocks, dtype=np.int64) self._mpiw.Allreduce(length_per_block, global_length_per_block) local_serialized_structure = np.zeros( global_length_per_block.sum(), dtype=np.int64 ) offset = 0 block_indices_set = set(block_indices) for ndx in range(self.nblocks): if ndx in block_indices_set: local_serialized_structure[ offset : offset + global_length_per_block[ndx] ] = serialized_structure_by_block[ndx] offset += global_length_per_block[ndx] global_serialized_structure = np.zeros( global_length_per_block.sum(), dtype=np.int64 ) self._mpiw.Allreduce(local_serialized_structure, global_serialized_structure) result = BlockVector(self.nblocks) structure_ndx = 0 self._create_from_serialized_structure( global_serialized_structure, structure_ndx, result ) return result
[docs] def make_local_copy(self): """ Copies the MPIBlockVector into a BlockVector Returns ------- BlockVector """ assert_block_structure(self) if not self.is_broadcasted(): self.broadcast_block_sizes() result = self.make_local_structure_copy() local_data = np.zeros(self.size) global_data = np.zeros(self.size) offset = 0 rank = self._mpiw.Get_rank() if rank == 0: block_indices = set(self._owned_blocks) else: block_indices = set(self._unique_owned_blocks) for ndx in range(self.nblocks): if ndx in block_indices: blk = self.get_block(ndx) if isinstance(blk, BlockVector): local_data[offset : offset + self.get_block_size(ndx)] = ( blk.flatten() ) elif isinstance(blk, np.ndarray): local_data[offset : offset + self.get_block_size(ndx)] = blk else: raise ValueError('Unrecognized block type') offset += self.get_block_size(ndx) self._mpiw.Allreduce(local_data, global_data) result.copyfrom(global_data) return result
def _binary_operation_helper(self, other, operation): assert_block_structure(self) result = self.copy_structure() if isinstance(other, MPIBlockVector) or isinstance(other, BlockVector): assert ( self.nblocks == other.nblocks ), 'Number of blocks mismatch: {} != {}'.format(self.nblocks, other.nblocks) if isinstance(other, MPIBlockVector): assert ( np.array_equal(self._rank_owner, other._rank_owner) or self._mpiw.Get_size() == 1 ), 'MPIBlockVectors must be distributed in same processors' assert self._mpiw == other._mpiw, 'Need to have same communicator' for i in self._owned_blocks: result.set_block(i, operation(self.get_block(i), other.get_block(i))) return result elif isinstance(other, np.ndarray): _tmp = self.copy_structure() _tmp.copyfrom(other) return self._binary_operation_helper(_tmp, operation) elif np.isscalar(other): for i in self._owned_blocks: result.set_block(i, operation(self.get_block(i), other)) return result else: raise NotImplementedError('Operation not supported by MPIBlockVector') def _reverse_binary_operation_helper(self, other, operation): assert_block_structure(self) result = self.copy_structure() if isinstance(other, BlockVector): raise RuntimeError('Operation not supported by MPIBlockVector') elif isinstance(other, np.ndarray): raise RuntimeError('Operation not supported by MPIBlockVector') elif np.isscalar(other): for i in self._owned_blocks: result.set_block(i, operation(other, self.get_block(i))) return result else: raise NotImplementedError('Operation not supported by MPIBlockVector') def _inplace_binary_operation_helper(self, other, operation): assert_block_structure(self) if isinstance(other, MPIBlockVector) or isinstance(other, BlockVector): assert ( self.nblocks == other.nblocks ), 'Number of blocks mismatch: {} != {}'.format(self.nblocks, other.nblocks) if isinstance(other, MPIBlockVector): assert ( np.array_equal(self._rank_owner, other._rank_owner) or self._mpiw.Get_size() == 1 ), 'MPIBlockVectors must be distributed in same processors' assert self._mpiw == other._mpiw, 'Need to have same communicator' assert_block_structure(other) else: block_vector_assert_block_structure(other) for i in self._owned_blocks: blk = self.get_block(i) operation(blk, other.get_block(i)) self.set_block(i, blk) return self elif isinstance(other, np.ndarray): _tmp = self.copy_structure() _tmp.copyfrom(other) return self._inplace_binary_operation_helper(_tmp, operation) elif np.isscalar(other): for i in self._owned_blocks: blk = self.get_block(i) operation(blk, other) self.set_block(i, blk) return self else: raise NotImplementedError('Operation not supported by MPIBlockVector') def __add__(self, other): return self._binary_operation_helper(other, operator.add) def __radd__(self, other): return self.__add__(other) def __sub__(self, other): return self._binary_operation_helper(other, operator.sub) def __rsub__(self, other): return self._reverse_binary_operation_helper(other, operator.sub) def __mul__(self, other): return self._binary_operation_helper(other, operator.mul) def __rmul__(self, other): return self.__mul__(other) def __truediv__(self, other): return self._binary_operation_helper(other, operator.truediv) def __rtruediv__(self, other): return self._reverse_binary_operation_helper(other, operator.truediv) def __floordiv__(self, other): return self._binary_operation_helper(other, operator.floordiv) def __rfloordiv__(self, other): return self._reverse_binary_operation_helper(other, operator.floordiv) def __neg__(self): assert_block_structure(self) result = self.copy_structure() for ndx in self._owned_blocks: result.set_block(ndx, -self.get_block(ndx)) return result def __iadd__(self, other): return self._inplace_binary_operation_helper(other, operator.iadd) def __isub__(self, other): return self._inplace_binary_operation_helper(other, operator.isub) def __imul__(self, other): return self._inplace_binary_operation_helper(other, operator.imul) def __itruediv__(self, other): return self._inplace_binary_operation_helper(other, operator.itruediv) def __div__(self, other): return self.__truediv__(other) def __rdiv__(self, other): return self.__rtruediv__(other) def __idiv__(self, other): return self.__itruediv__(other) def _comparison_helper(self, other, operation): assert_block_structure(self) result = self.copy_structure() if isinstance(other, MPIBlockVector): assert_block_structure(other) assert ( self.nblocks == other.nblocks ), 'Number of blocks mismatch: {} != {}'.format(self.nblocks, other.nblocks) assert ( np.array_equal(self._rank_owner, other._rank_owner) or self._mpiw.Get_size() == 1 ), 'MPIBlockVectors must be distributed in same processors' assert self._mpiw == other._mpiw, 'Need to have same communicator' for i in self._owned_blocks: result.set_block(i, operation(self.get_block(i), other.get_block(i))) return result elif isinstance(other, BlockVector): raise RuntimeError('Operation not supported by MPIBlockVector') elif isinstance(other, np.ndarray): raise RuntimeError('Operation not supported by MPIBlockVector') elif np.isscalar(other): for i in self._owned_blocks: result.set_block(i, operation(self.get_block(i), other)) return result else: raise NotImplementedError('Operation not supported by MPIBlockVector') def __le__(self, other): return self._comparison_helper(other, operator.le) def __lt__(self, other): return self._comparison_helper(other, operator.lt) def __ge__(self, other): return self._comparison_helper(other, operator.ge) def __gt__(self, other): return self._comparison_helper(other, operator.gt) def __eq__(self, other): return self._comparison_helper(other, operator.eq) def __ne__(self, other): return self._comparison_helper(other, operator.ne) def __contains__(self, item): other = item assert_block_structure(self) if np.isscalar(other): contains = False for i in self._owned_blocks: if other in self.get_block(i): contains = True return bool(self._mpiw.allreduce(contains, op=mpi4py.MPI.SUM)) else: raise NotImplementedError('Operation not supported by MPIBlockVector') def get_block(self, key): owner = self._rank_owner[key] rank = self._mpiw.Get_rank() assert owner == rank or owner < 0, 'Block {} not own by processor {}'.format( key, rank ) return self._block_vector.get_block(key) def set_block(self, key, value): owner = self._rank_owner[key] rank = self._mpiw.Get_rank() assert owner == rank or owner < 0, 'Block {} not owned by processor {}'.format( key, rank ) self._block_vector.set_block(key, value) self._set_block_size(key, value.size) def _has_equal_structure(self, other): if not (isinstance(other, MPIBlockVector) or isinstance(other, BlockVector)): return False if self.nblocks != other.nblocks: return False if isinstance(other, MPIBlockVector): if ( self.owned_blocks != other.owned_blocks ).any() and self._mpiw.Get_size() != 1: return False for ndx in self.owned_blocks: block1 = self.get_block(ndx) block2 = other.get_block(ndx) if isinstance(block1, BlockVector): if not isinstance(block2, BlockVector): return False if not block1._has_equal_structure(block2): return False elif isinstance(block2, BlockVector): return False return True def __getitem__(self, item): if not self._has_equal_structure(item): raise ValueError( 'MIPBlockVector.__getitem__ only accepts slices in the form of MPIBlockVectors of the same structure' ) res = self.copy_structure() for ndx in self.owned_blocks: block = self.get_block(ndx) res.set_block(ndx, block[item.get_block(ndx)]) def __setitem__(self, key, value): if not ( self._has_equal_structure(key) and (self._has_equal_structure(value) or np.isscalar(value)) ): raise ValueError( 'MPIBlockVector.__setitem__ only accepts slices in the form of MPIBlockVectors of the same structure' ) if np.isscalar(value): for ndx in self.owned_blocks: block = self.get_block(ndx) block[key.get_block(ndx)] = value else: for ndx in self.owned_blocks: block = self.get_block(ndx) block[key.get_block(ndx)] = value.get_block(ndx) def __str__(self): msg = '{}{}:\n'.format(self.__class__.__name__, self.bshape) for idx in range(self.nblocks): msg += '{}: Owned by processor {}\n'.format(idx, self._rank_owner[idx]) return msg def __repr__(self): return '{}{}'.format(self.__class__.__name__, self.bshape)
[docs] def pprint(self, root=0): """Prints BlockVector in pretty format""" assert_block_structure(self) msg = self.__repr__() + '\n' num_processors = self._mpiw.Get_size() local_mask = self._owned_mask.flatten() receive_data = np.empty(num_processors * self.nblocks, dtype=bool) self._mpiw.Allgather(local_mask, receive_data) processor_to_mask = np.split(receive_data, num_processors) global_mask = np.zeros(self.nblocks, dtype=bool) for bid in range(self.nblocks): owner = self._rank_owner[bid] if owner >= 0: global_mask[bid] = processor_to_mask[owner][bid] else: # checks only the mask of one of them since all must have the same global_mask[bid] = processor_to_mask[0][bid] disp_owner = self._rank_owner[bid] if self._rank_owner[bid] >= 0 else 'All' is_none = '' if global_mask[bid] else 'None' repn = 'Owned by {} Shape({},){}'.format( disp_owner, self._brow_lengths[bid], is_none ) msg += '{}: {}\n'.format(bid, repn) if self._mpiw.Get_rank() == root: print(msg)
def __len__(self): raise NotImplementedError('Use size or nblocks') def __iter__(self): raise NotImplementedError('Not supported by MPIBlockVector')
[docs] def std(self, axis=None, dtype=None, out=None, ddof=0, keepdims=False): raise RuntimeError('Operation not supported by MPIBlockVector')
[docs] def var(self, axis=None, dtype=None, out=None, ddof=0, keepdims=False): raise RuntimeError('Operation not supported by MPIBlockVector')
[docs] def cumprod(self, axis=None, dtype=None, out=None): raise RuntimeError('Operation not supported by MPIBlockVector')
[docs] def cumsum(self, axis=None, dtype=None, out=None): raise RuntimeError('Operation not supported by MPIBlockVector')
[docs] def tolist(self): """ Disable `np.ndarray.tolist` as it is not supported. """ raise RuntimeError('Operation not supported by MPIBlockVector')
[docs] def flatten(self, order='C'): raise RuntimeError('Operation not supported by MPIBlockVector')
[docs] def ravel(self, order='C'): raise RuntimeError('Operation not supported by MPIBlockVector')