decorated function cannot be used in starmap of mpi4py, is there alternative?

83 views
Skip to first unread message

chengd...@gmail.com

unread,
Feb 15, 2018, 3:57:34 PM2/15/18
to mpi4py
I used mpi4py.File to create a logging handler. And using the handler I created a decorator. Using the decorator I created some functions which I want to trace its execution. The I used starmap of MPICommExecutor  to run it paralleled. However, it shows that the `AttributeError: Can't pickle local object 'args_timing_log.__call__.<locals>.wrapped_f'`.

In order to solve it I have to manually expand the decorator. It is not so pythonic.


- Jeff Daily 's attachment gives an idea to modify the pickling and unpickling behavior
- Thomas gives his solution: mpi4py_map
- Lisandro Dalcin gives a explanation of what happened behind the error.

Is there any alternative ways to move decorated function to the `root` space?

# -*- coding: utf-8 -*-
"""
Created on Wed Feb 14 16:17:38 2018

@author: cheng

reference: 
"""

#%% mpi4py logging handler
from mpi4py import MPI 
import logging
from os.path import abspath
import time

class MPIFileHandler(logging.FileHandler):                                      
    def __init__(self,
                 filename,
                 mode=MPI.MODE_WRONLY|MPI.MODE_CREATE|MPI.MODE_APPEND ,
                 encoding='utf-8',  
                 delay=False,
                 comm=MPI.COMM_WORLD ):                                                
        self.baseFilename = abspath(filename)                           
        self.mode = mode                                                        
        self.encoding = encoding                                            
        self.comm = comm                                                        
        if delay:                                                               
            #We don't open the stream, but we still need to call the            
            #Handler constructor to set level, formatter, lock etc.             
            logging.Handler.__init__(self)                                      
            self.stream = None                                                  
        else:                                                                   
           logging.StreamHandler.__init__(self, self._open())                   
                                                                                
    def _open(self):                                                            
        stream = MPI.File.Open( self.comm, self.baseFilename, self.mode )     
        stream.Set_atomicity(True)                                              
        return stream
                                                    
    def emit(self, record):
        """
        Emit a record.

        If a formatter is specified, it is used to format the record.
        The record is then written to the stream with a trailing newline.  If
        exception information is present, it is formatted using
        traceback.print_exception and appended to the stream.  If the stream
        has an 'encoding' attribute, it is used to determine how to do the
        output to the stream.
        
        Modification:
            stream is MPI.File, so it must use `Write_shared` method rather
            than `write` method. And `Write_shared` method only accept 
            bytestring, so `encode` is used. `Write_shared` should be invoked
            only once in each all of this emit function to keep atomicity.
        """
        try:
            msg = self.format(record)
            stream = self.stream
            stream.Write_shared((msg+self.terminator).encode(self.encoding))
            #self.flush()
        except Exception:
            self.handleError(record)
                                                         
    def close(self):                                                            
        if self.stream:                                                         
            self.stream.Sync()                                                  
            self.stream.Close()                                                 
            self.stream = None                                                  

#%%entry exit timing log decorator
# note: decorator with argument behave very differently.
class args_timing_log(object):
    """
    This decorator is used to trace arguments, return values and timing of 
    function call. 
    """
    def __init__(self, logger): # init decorator
        self.logger = logger

    def __call__(self,f): # return wrapped function
        def wrapped_f(*args,**kwargs):
            t0 = time.time()
            ret = f(*args,**kwargs)
            t1 = time.time()
            self.logger.debug(r"{}<={}(*args,**kwargs),cost {} s,args={},kwargs={}".\
                  format( ret, 
                  f.__name__,
                  t1-t0,
                  args,
                  kwargs)
                  )
            return ret
        
        return wrapped_f




#%% example code
if __name__ == "__main__":
    comm = MPI.COMM_WORLD                                                           
    logger = logging.getLogger("rank[%i]"%comm.rank)                                
    logger.setLevel(logging.DEBUG)                                                                           
    mh = MPIFileHandler("utils_di.log")
    formatter = logging.Formatter('%(asctime)s:%(name)s:%(levelname)s:%(message)s')
    mh.setFormatter(formatter) 
    logger.addHandler(mh)                                                           
    # 'application' code                                                            
    logger.debug('debug message')                                                   
    logger.info('info message')                                                     
    logger.warning('warn message')                                                     
    logger.error('error message')                                                   
    logger.critical('critical message')  
    
    if comm.rank == 0 :
        @args_timing_log(logger)
        def foo(x,y,z=120):
            print("inside func1(x,y,z=120)")
            return 1,2,[1,2,4]
        
        @args_timing_log(logger)
        def bar():
            print("inside func2()")
    
        a = foo(120,210,z=250)
        print("a={}".format(a))
        foo(120,210)
        bar()

    # test starmap
    from mpi4py.futures import TimeoutError, MPICommExecutor
    with MPICommExecutor(MPI.COMM_WORLD, root=0)  as pool:
        results = pool.starmap(foo,
                          zip(range(100),range(100)),
                          chunksize = 500, # chucksize = 500 because ignition delay costs less time
                          unordered=True,# unordered
                          Timeout = 0.1) 
        try:
            for r in results:
                print(r)
        except TimeoutError: 
            print("Error Occurs")
    
        

Di


chengd...@gmail.com

unread,
Feb 15, 2018, 4:02:58 PM2/15/18
to mpi4py
I just comes up with an idea:

    @args_timing_log(logger) # decorator generate a `local` function.
    def foo(x,y,z=120):
        print("inside func1(x,y,z=120)")
        return 1,2,[1,2,4] #whatever it returns

    def name_call(fn:str,*args,**kwargs):
        return globals()[fn](*args,**kwargs) # it works with globals() rather than locals()


    with MPICommExecutor(MPI.COMM_WORLD, root=0)  as pool:
        if pool is not None:
            results = pool.starmap(name_call,
                              zip(['foo']*5,range(5),range(5)),
                              chunksize = 500, # chucksize = 500 because ignition delay costs less time
                              unordered=True,# unordered
                              Timeout = 0.1) 
            try:
                for r in results:
                    print(r)
            except TimeoutError: 
                print("Error Occurs")
    

It is a little uglier than I expected.

But I do not understand why it works with `globals()` rather than `locals()`. The error indicates the "Can't pickle local object ". I am confused with the concept of local.

chengd...@gmail.com

unread,
Feb 15, 2018, 4:53:13 PM2/15/18
to mpi4py
It can be further simplified:

    def foo(x,y,z=120):
        print("inside func1(x,y,z=120)")
        return 1,2,[1,2,4]
    
    def foo_byname(x,y,z=120):
        return globals()['fls_sim'](x,y,z)

    from mpi4py.futures import MPICommExecutor
    with MPICommExecutor(MPI.COMM_WORLD,root=0) as pool:
        if pool is not None:
            results = pool.starmap(foo_byname,zip(range(10),range(10)))
            for i,r in enumerate(results):
                print(f"{i}:{r}")


Lisandro Dalcin

unread,
Feb 16, 2018, 8:51:31 AM2/16/18
to mpi4py
On 16 February 2018 at 00:53, <chengd...@gmail.com> wrote:
> It can be further simplified:
>
> def foo(x,y,z=120):
> print("inside func1(x,y,z=120)")
> return 1,2,[1,2,4]
>
> def foo_byname(x,y,z=120):
> return globals()['fls_sim'](x,y,z)
>

I would just do:

@decorator
def foo(...):
...
return something

def foo_fn(*args,**kwargs):
return foo(*args,**kwargs)

with MPICommExecutor(MPI.COMM_WORLD,root=0) as pool:
if pool is not None:
results = pool.starmap(foo_fn, ....)

--
Lisandro Dalcin
============
Research Scientist
Computer, Electrical and Mathematical Sciences & Engineering (CEMSE)
Extreme Computing Research Center (ECRC)
King Abdullah University of Science and Technology (KAUST)
http://ecrc.kaust.edu.sa/

4700 King Abdullah University of Science and Technology
al-Khawarizmi Bldg (Bldg 1), Office # 0109
Thuwal 23955-6900, Kingdom of Saudi Arabia
http://www.kaust.edu.sa

Office Phone: +966 12 808-0459

chengd...@gmail.com

unread,
Feb 16, 2018, 11:49:44 PM2/16/18
to mpi4py
Hi Lisandro,

I just found there is a more elegant solution using `functools.wraps` in this blog: http://gael-varoquaux.info/programming/decoration-in-python-done-right-decorating-and-pickling.html

Reply all
Reply to author
Forward
0 new messages