comm.Gather and comm.Scatter

821 views
Skip to first unread message

Mark Bostock

unread,
Mar 15, 2016, 8:50:20 AM3/15/16
to mpi4py
Hello,

I am trying to parallelise some operations on a large numpy array using mpi4py. I am currently using numpy.array_split to divide the array into chunks, followed by com.scatter to send the array to different cores and then comm.gather to collect the resulting arrays. A minimal (not) working example is below:

    import numpy as np
    from mpi4py import MPI
   
   
    comm = MPI.COMM_WORLD
    size = comm.Get_size()
    rank = comm.Get_rank()
   
    if rank == 0:
        test = np.random.rand(411,48,52,40)
        test_chunks = np.array_split(test,size,axis=0)
   
    else:
        test_chunks = None
   
    test_chunk = comm.scatter(test_chunks,root=0)
    output_chunk = np.zeros([np.shape(test_chunk)[0],128,128,128])
   
    for i in range(0,np.shape(test_chunk)[0],1):
        print(i)
        output_chunk[i,0:48,0:52,0:40] = test_chunk[i]
   
    outputData = comm.gather(output_chunk,root=0)
   
   
    if rank == 0:
        outputData = np.concatenate(outputData,axis = 0)

Running this gives me the error:

      File "test_4d.py", line 23, in <module>
        outputData = comm.gather(output_chunk,root=0)
      File "Comm.pyx", line 869, in mpi4py.MPI.Comm.gather (src/mpi4py.MPI.c:73266)
      File "pickled.pxi", line 614, in mpi4py.MPI.PyMPI_gather (src/mpi4py.MPI.c:33592)
      File "pickled.pxi", line 146, in mpi4py.MPI._p_Pickle.allocv (src/mpi4py.MPI.c:28517)
      File "pickled.pxi", line 95, in mpi4py.MPI._p_Pickle.alloc (src/mpi4py.MPI.c:27832)
    SystemError: Negative size passed to PyString_FromStringAndSize

This error seems to result from the large size of the numpy arrays being collected by gather; since scatter and gather send the arrays as a list of arrays, it appears easy to exceed the list size. One suggestion I have come across is to use comm.Scatter and comm.Gather. However, I am struggling to find clear documentation for these functions and so far have been unable to successfully implement them. For example:

replacing

outputData = comm.gather(output_chunk,root=0)

with the line

outputData=comm.Gather(sendbuf[test_chunks,MPI.DOUBLE],recvbuf=output_chunk,MPI.DOUBLE],root=0)

gives the error:

      File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
      File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
      File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
      File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
      File "message.pxi", line 51, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19644)
      File "asbuffer.pxi", line 108, in mpi4py.MPI.getbuffer (src/mpi4py.MPI.c:6757)
      File "asbuffer.pxi", line 50, in mpi4py.MPI.PyObject_GetBufferEx (src/mpi4py.MPI.c:6093)
    TypeError: expected a readable buffer object

or with the line:

    outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)

gives the error:

      File "test_4d_2.py", line 24, in <module>
        outputData = comm.Gather(sendbuf=test_chunks, recvbuf=output_chunk,root=0)
      File "Comm.pyx", line 415, in mpi4py.MPI.Comm.Gather (src/mpi4py.MPI.c:66916)
      File "message.pxi", line 426, in mpi4py.MPI._p_msg_cco.for_gather (src/mpi4py.MPI.c:23559)
      File "message.pxi", line 355, in mpi4py.MPI._p_msg_cco.for_cco_send (src/mpi4py.MPI.c:22959)
      File "message.pxi", line 111, in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:20516)
      File "message.pxi", line 60, in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:19747)
    TypeError: unhashable type: 'numpy.ndarray'

Furthermore, the input matrix, test may also increase in size, which could cause similar problems for comm.scatter. Aside from the problems I already have with comm.Gather, I am not sure how to set up comm.Scatter, since recvbuf is defined based on the size of test_chunk, which is the output of comm.scatter, so hence I can't specify recvbuf within comm.Scatter.

Any help would be very much appreciated.

Mark

Lisandro Dalcin

unread,
Mar 16, 2016, 4:57:07 AM3/16/16
to mpi4py
On 15 March 2016 at 15:49, Mark Bostock <mark.bo...@gmail.com> wrote:
> Hello,
>
> I am trying to parallelise some operations on a large numpy array using
> mpi4py. I am currently using numpy.array_split to divide the array into
> chunks, followed by com.scatter to send the array to different cores and
> then comm.gather to collect the resulting arrays.

You are using the lowercase "gather/scatter" methods, however you
should try to use the uppercase ones "Scatter/Gatter", or actually
"Scatterv/Gatherv" (the 'vector' variants), which are much more
efficient for array data. Take a look here:
http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html#scatterv-and-gatherv

However, even in that case, if the number of entries in your array
exceed the 2GB limit of 32bit integers, your code is going to fail
anyway. One way to alleviate this issue is to use a user-defined
datatype packing many entries in one, e.g. something like "large_dtype
= MPI.DOUBLE.Create_contiguous(48*52*40).Commit()".

--
Lisandro Dalcin
============
Research Scientist
Computer, Electrical and Mathematical Sciences & Engineering (CEMSE)
Extreme Computing Research Center (ECRC)
King Abdullah University of Science and Technology (KAUST)
http://ecrc.kaust.edu.sa/

4700 King Abdullah University of Science and Technology
al-Khawarizmi Bldg (Bldg 1), Office # 4332
Thuwal 23955-6900, Kingdom of Saudi Arabia
http://www.kaust.edu.sa

Office Phone: +966 12 808-0459
Reply all
Reply to author
Forward
0 new messages