Use custom-MPI-datatype on custom-MPI-operator with Reduce

23 views
Skip to first unread message

Yiyang Li

unread,
Aug 27, 2020, 5:49:56 PM8/27/20
to mpi4py
Hi,

This question follows my false bug report at 

In short, I am trying to specify a custom-MPI-datatype for a custom-MPI-operator. The custom-MPI-datatype is defined as follows

---------------------------------------------------------------------
def mpi_custom_datatype(char_code):
    """ User-defined data type for MPI. """
    cc = char_code.split(',')
    for _ in cc:
        assert _[0] in 'iuf'

    np_dtype = np.dtype(char_code)  # NumPy structured datatype

    nbytes = [np.dtype(_).itemsize for _ in cc]
    nbytes = np.hstack(([0], np.add.accumulate(nbytes)[:-1]))

    mpi_dtype = MPI.Datatype.Create_struct(
        list(range(len(cc))),      # block lengths for fields of struct
        nbytes,                            # displacements/offsets in bytes for fields of struct
        [MPI.LONG_LONG, MPI.DOUBLE])   # hard-coded for simplicity

    extent = np.zeros(1, dtype=np_dtype).nbytes
    if mpi_dtype.extent != extent:
        mpi_dtype = mpi_dtype.Create_resized(0, extent)

    mpi_dtype.Commit()
    return np_dtype, mpi_dtype
---------------------------------------------------------------------

To use the custom datatype on the lower case reduce is straightforward, as the following example (a1, a2 are np.ndarray)

---------------------------------------------------------------------
def find_min(a1, a2, datatype):
    """ Custom MPI operator to get the one with the smaller 2nd term. """
    if a1[0][1] > a2[0][1]:
        return a2
    else:
        return a1

rank = comm.rank
RS   = np.random.RandomState(rank)

char_code = 'i8,f8'
np_dtype, mpi_dtype = mpi_custom_datatype(char_code)

a = array([(RS.randint(0,int(1e12)), RS.random())], dtype=np_dtype)
print(rank, a)

FIND_MIN = MPI.Op.Create(find_min, commute=True)
amin = comm.reduce(a, op=FIND_MIN, root=0)

if rank == 0:
    print(amin)
---------------------------------------------------------------------

which gives the following (correct) outputs, where the one with the smallest 2nd item is returned by the lower case reduce function.

---------------------------------------------------------------------
1 [(163196666091, 0.72032449)]
2 [(722349427215, 0.02592623)]
3 [(455570294424, 0.70814782)]
0 [(741280623151, 0.71518937)]

[(722349427215, 0.02592623)]
---------------------------------------------------------------------

However, I am not sure how to use the custom-MPI-operator with the upper case Reduce function. Here is my testing code (a1, a2 are MPI.memory objects)

---------------------------------------------------------------------
def find_min(a1, a2, datatype):
    """ Custom MPI operator to get the one with the smaller 2nd term.
    """
    dim = (1,)       # Hard-coded

    a1p = to_ndarray(a1, datatype, dim)
    a2p = to_ndarray(a2, datatype, dim)

    if a1p[0][1] < a2p[0][1]:
        a2p[0][0] = a1p[0][0]
        a2p[0][1] = a1p[0][1]

def to_ndarray(a:MPI.memory, dtype:type, dim:Tuple[int,...]):
    """ Convert a mpi4py.MPI.memory object to a numpy ndarray. """
    buf = np.array(a, dtype='B', copy=False)
    return np.ndarray(buffer=buf, dtype=dtype, shape=dim)
---------------------------------------------------------------------

It raises the following error message
"return np.ndarray(buffer=buf, dtype=dtype, shape=dim)
TypeError: data type not understood"

The error persists if I specifically provide the custom-MPI-operator to the function definition:
def find_min(a1, a2, datatype=mpi_dtype)

It will be much appreciated if the problem can be solved. Thank you.

Lisandro Dalcin

unread,
Aug 28, 2020, 3:11:26 AM8/28/20
to mpi...@googlegroups.com
On Fri, 28 Aug 2020 at 00:49, Yiyang Li <liyiy...@gmail.com> wrote:

---------------------------------------------------------------------
def find_min(a1, a2, datatype):
    """ Custom MPI operator to get the one with the smaller 2nd term.
    """
    dim = (1,)       # Hard-coded

    a1p = to_ndarray(a1, datatype, dim)
    a2p = to_ndarray(a2, datatype, dim)

    if a1p[0][1] < a2p[0][1]:
        a2p[0][0] = a1p[0][0]
        a2p[0][1] = a1p[0][1]


When MPI/mpi4py calls back on the above function, `datatype` is an instance of MPI.Datatype, something that NumPy will not understand, so you cannot use it to create NumPy arrays

Simply start your `find_min()` callback this way:

def find_min(a1, a2, datatype):
    a1p = np.frombuffer(a1, 'i8,f8')
    a2p = np.frombuffer(a2, 'i8,f8')
    ... # continue from here with your reduce logic


 
def to_ndarray(a:MPI.memory, dtype:type, dim:Tuple[int,...]):
    """ Convert a mpi4py.MPI.memory object to a numpy ndarray. """
    buf = np.array(a, dtype='B', copy=False)
    return np.ndarray(buffer=buf, dtype=dtype, shape=dim)
---------------------------------------------------------------------


And get rid of `to_ndarray()`, using `np.frombuffer()` is a one-liner. 

Please note that the `MPI.memory` class is a stripped down version of Python's builtin `memoryview`, its only purpose is to provide common ground to support various Python versions (and particularly Python 2).  Other than that, `MPI.memory` is like any other object exposing the (old and new) buffer protocol, thus you can use np.frombuffer() to convert it (in a memory-sharing way) to a more computationally convenient NumPy array.





 
It raises the following error message
"return np.ndarray(buffer=buf, dtype=dtype, shape=dim)
TypeError: data type not understood"

The error persists if I specifically provide the custom-MPI-operator to the function definition:
def find_min(a1, a2, datatype=mpi_dtype)

It will be much appreciated if the problem can be solved. Thank you.

--
You received this message because you are subscribed to the Google Groups "mpi4py" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mpi4py+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/mpi4py/8b8a9659-f4e7-45b9-bf8a-395313ad5aedn%40googlegroups.com.


--
Lisandro Dalcin
============
Research Scientist
Extreme Computing Research Center (ECRC)
King Abdullah University of Science and Technology (KAUST)
http://ecrc.kaust.edu.sa/

Yiyang Li

unread,
Aug 28, 2020, 5:36:36 PM8/28/20
to mpi4py
It is wierd that 

    a1p = np.frombuffer(a1, 'i8,f8')
    a2p = np.frombuffer(a2, 'i8,f8')

gives incorrect integer values, the output is 

---------------------------------------------------------------------
0 [(741280623151, 0.71518937)]
1 [(163196666091, 0.72032449)]
2 [(722349427215, 0.02592623)]
3 [(455570294424, 0.70814782)]

[(33, 0.02592623)] [(0, 0.70814782)]
[(33, 0.72032449)] [(33, 0.02592623)]
[(741280623151, 0.71518937)] [(33, 0.02592623)]

[(33, 0.02592623)] 
---------------------------------------------------------------------

But one integer is converted correctly, wierd.
The floating point numbers are all correct.

Lisandro Dalcin

unread,
Aug 28, 2020, 6:20:51 PM8/28/20
to mpi...@googlegroups.com
Either you MPI is broken, or you have a bug somewhere.

from mpi4py import MPI
import numpy as np

npy_dtype = np.dtype('i8,f8')

mpi_dtype = MPI.Datatype.Create_struct(
    [1, 1],
    [0, 8],
    [MPI.LONG_LONG, MPI.DOUBLE]
).Commit()

def find_min(a1, a2, datatype):
    a1 = np.frombuffer(a1, npy_dtype)
    a2 = np.frombuffer(a2, npy_dtype)

    if a1[0][1] < a2[0][1]:
        a2[...] = a1

FIND_MIN = MPI.Op.Create(find_min, commute=True)

comm = MPI.COMM_WORLD

rank = comm.rank

RS = np.random.RandomState(rank)
index = RS.randint(0, 10**12)
value = RS.random()

a = np.array([(index , value)], dtype=npy_dtype)
amin = np.empty(1, dtype=npy_dtype) if rank == 0 else None

print(f"[{rank}]: {a}")
comm.Reduce([a, mpi_dtype], [amin, mpi_dtype], op=FIND_MIN, root=0)
if rank == 0:
    print(f'min: {amin}')

$ mpiexec -n 9 python test-find-min.py
[1]: [(163196666091, 0.72032449)]
[5]: [(425438759118, 0.87073231)]
[7]: [(752595690692, 0.77991879)]
[6]: [(596774865609, 0.33197981)]
[4]: [(527854149806, 0.54723225)]
[3]: [(455570294424, 0.70814782)]
[2]: [(722349427215, 0.02592623)]
[8]: [(837566358868, 0.96854066)]
[0]: [(741280623151, 0.71518937)]
min: [(722349427215, 0.02592623)]


Message has been deleted

Yiyang Li

unread,
Aug 29, 2020, 7:50:00 PM8/29/20
to mpi4py
Yes, Lisandro, you are correct. It was caused by a typo in my function mpi_custom_datatype.

mpi_dtype = MPI.Datatype.Create_struct(
            list(range(len(cc))),         
            nbytes,   
            [MPI.LONG_LONG, MPI.DOUBLE])

which should actually be

mpi_dtype = MPI.Datatype.Create_struct(
            [1]*len(cc), 
            nbytes,  
            [MPI.LONG_LONG, MPI.DOUBLE])

So the mpi data type was created wrong, and the wrong data type is passed to

comm.Reduce([a,mpi_dtype], [amin,mpi_dtype], op=FIND_MIN, root=0)

which interfered with the passing of a (but I am not aware of the detail), and finally 

np.frombuffer(a1, 'i8,f8')
np.frombuffer(a2, 'i8,f8')

returned incorrect data due to the incorrect a1 and a2 passed to them.

Thank you for explaining everything clear. I was in a hurry yesterday, and it would turn out better if I waited.

Best,
Yiyang
Reply all
Reply to author
Forward
0 new messages