How to implement a MPI worker pool for high cost function with high cost initialization with timeout and dynamic dispatch?

642 views
Skip to first unread message

chengd...@gmail.com

unread,
Feb 9, 2018, 8:03:07 AM2/9/18
to mpi4py
HI everyone,

I am a little confused about the master - slave/worker design pattern's implementation with MPI. I have a function f(x) which will cost ~10s to initialize (initializer init() ) and ~10s to calculate a value. So I want to make a MPI worker pool to serve the master process to do optimization.

However, the worker pool needs `Queue` and `while True` loops. I have difficult in writing the dispatch code. Do you have some examples on this?

I can see the MPIPool and concurrent.future pools. I do not know how to separate initialization from it. And I also need dispatch the work load dynamically and set time out because the work load are not equal size and may not fail in certain parameter range.




Lisandro Dalcin

unread,
Feb 14, 2018, 11:16:26 AM2/14/18
to mpi4py
On 9 February 2018 at 15:36, <chengd...@gmail.com> wrote:
> HI everyone,
>
> I am a little confused about the master - slave/worker design pattern's
> implementation with MPI. I have a function f(x) which will cost ~10s to
> initialize (initializer init() ) and ~10s to calculate a value. So I want to
> make a MPI worker pool to serve the master process to do optimization.
>
> However, the worker pool needs `Queue` and `while True` loops. I have
> difficult in writing the dispatch code. Do you have some examples on this?
>

Do you know that last mpi4py release comes with mpi4py.futures, an
almost drop-in replacement for concurrent.futures? I would really
recommend you to use that rather than implementing your own
master/worker tool.

Here you have a working example for your use case:


# Run with: mpiexec -n 1 python script.py
from mpi4py.futures import MPIPoolExecutor

def square(i):
global initialized
try:
initialized
except NameError:
initialized = False
if not initialized:
print("expensive initialization")
import time
time.sleep(2)
initialized = True

return i**2

if __name__ == '__main__':
with MPIPoolExecutor(2) as ex:
for result in ex.map(square, range(7)):
print result



--
Lisandro Dalcin
============
Research Scientist
Computer, Electrical and Mathematical Sciences & Engineering (CEMSE)
Extreme Computing Research Center (ECRC)
King Abdullah University of Science and Technology (KAUST)
http://ecrc.kaust.edu.sa/

4700 King Abdullah University of Science and Technology
al-Khawarizmi Bldg (Bldg 1), Office # 0109
Thuwal 23955-6900, Kingdom of Saudi Arabia
http://www.kaust.edu.sa

Office Phone: +966 12 808-0459

chengd...@gmail.com

unread,
Feb 15, 2018, 3:00:43 PM2/15/18
to mpi4py
Hi Lisandro,

Thank you for your demo code. 

I used mpi4py.futures. It solved part of my problem now. I used MPICommExecutor to preserve the compatibility to windows MPI which do not support dynamic spawn. (Mostly, my problem is run in fixed number of cores). I also solved the mpi4py logging problem using shared file.

The last part of my problem is the exception handling. because map and starmap returns generator which will not be usable after exception. However, it is not a fatal error. 

Di

Lisandro Dalcin

unread,
Feb 16, 2018, 8:42:21 AM2/16/18
to mpi4py
On 15 February 2018 at 23:00, <chengd...@gmail.com> wrote:
>
> The last part of my problem is the exception handling. because map and
> starmap returns generator which will not be usable after exception. However,
> it is not a fatal error.
>

Well, do not blame me, that's how `concurrent.futures` was designed,
mpi4py just follows the implementation.

Please note that you can use `executor.submit()` in a loop, save all
the futures in a list, then iterate on them using `as_completed()`,
then you can ask for `future.result()` and handle errors in individual
tasks within a try/except block.
Reply all
Reply to author
Forward
0 new messages