# -*- coding: utf-8 -*-"""Created on Wed Feb 14 16:17:38 2018
@author: cheng
reference: """
#%% mpi4py logging handlerfrom mpi4py import MPI import loggingfrom os.path import abspathimport time
class MPIFileHandler(logging.FileHandler): def __init__(self, filename, mode=MPI.MODE_WRONLY|MPI.MODE_CREATE|MPI.MODE_APPEND , encoding='utf-8', delay=False, comm=MPI.COMM_WORLD ): self.baseFilename = abspath(filename) self.mode = mode self.encoding = encoding self.comm = comm if delay: #We don't open the stream, but we still need to call the #Handler constructor to set level, formatter, lock etc. logging.Handler.__init__(self) self.stream = None else: logging.StreamHandler.__init__(self, self._open()) def _open(self): stream = MPI.File.Open( self.comm, self.baseFilename, self.mode ) stream.Set_atomicity(True) return stream def emit(self, record): """ Emit a record.
If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an 'encoding' attribute, it is used to determine how to do the output to the stream. Modification: stream is MPI.File, so it must use `Write_shared` method rather than `write` method. And `Write_shared` method only accept bytestring, so `encode` is used. `Write_shared` should be invoked only once in each all of this emit function to keep atomicity. """ try: msg = self.format(record) stream = self.stream stream.Write_shared((msg+self.terminator).encode(self.encoding)) #self.flush() except Exception: self.handleError(record) def close(self): if self.stream: self.stream.Sync() self.stream.Close() self.stream = None
#%%entry exit timing log decorator# note: decorator with argument behave very differently.class args_timing_log(object): """ This decorator is used to trace arguments, return values and timing of function call. """ def __init__(self, logger): # init decorator self.logger = logger
def __call__(self,f): # return wrapped function def wrapped_f(*args,**kwargs): t0 = time.time() ret = f(*args,**kwargs) t1 = time.time() self.logger.debug(r"{}<={}(*args,**kwargs),cost {} s,args={},kwargs={}".\ format( ret, f.__name__, t1-t0, args, kwargs) ) return ret return wrapped_f
#%% example codeif __name__ == "__main__": comm = MPI.COMM_WORLD logger = logging.getLogger("rank[%i]"%comm.rank) logger.setLevel(logging.DEBUG) mh = MPIFileHandler("utils_di.log") formatter = logging.Formatter('%(asctime)s:%(name)s:%(levelname)s:%(message)s') mh.setFormatter(formatter) logger.addHandler(mh) # 'application' code logger.debug('debug message') logger.info('info message') logger.warning('warn message') logger.error('error message') logger.critical('critical message') if comm.rank == 0 : @args_timing_log(logger) def foo(x,y,z=120): print("inside func1(x,y,z=120)") return 1,2,[1,2,4] @args_timing_log(logger) def bar(): print("inside func2()") a = foo(120,210,z=250) print("a={}".format(a)) foo(120,210) bar()
# test starmap from mpi4py.futures import TimeoutError, MPICommExecutor with MPICommExecutor(MPI.COMM_WORLD, root=0) as pool: results = pool.starmap(foo, zip(range(100),range(100)), chunksize = 500, # chucksize = 500 because ignition delay costs less time unordered=True,# unordered Timeout = 0.1) try: for r in results: print(r) except TimeoutError: print("Error Occurs") @args_timing_log(logger) # decorator generate a `local` function. def foo(x,y,z=120): print("inside func1(x,y,z=120)") return 1,2,[1,2,4] #whatever it returns
def name_call(fn:str,*args,**kwargs): return globals()[fn](*args,**kwargs) # it works with globals() rather than locals()
with MPICommExecutor(MPI.COMM_WORLD, root=0) as pool: if pool is not None: results = pool.starmap(name_call, zip(['foo']*5,range(5),range(5)), chunksize = 500, # chucksize = 500 because ignition delay costs less time unordered=True,# unordered Timeout = 0.1) try: for r in results: print(r) except TimeoutError: print("Error Occurs") def foo(x,y,z=120): print("inside func1(x,y,z=120)") return 1,2,[1,2,4] def foo_byname(x,y,z=120): return globals()['fls_sim'](x,y,z)
from mpi4py.futures import MPICommExecutor with MPICommExecutor(MPI.COMM_WORLD,root=0) as pool: if pool is not None: results = pool.starmap(foo_byname,zip(range(10),range(10))) for i,r in enumerate(results): print(f"{i}:{r}")