multiprocessing error

47 views
Skip to first unread message

Daniel

unread,
Mar 13, 2014, 4:34:16 PM3/13/14
to ruffus_...@googlegroups.com
Hi folks,

I am not sure if this question is related to the previous topic of multi-threading not working properly on the command line but here goes:


When running the following section of code

#    @jobs_limit(8)
    @files(makeRuffusParams(sampleToPairsDict))
    def runConcurrent(readPair, rdp, sampleName):
        QTAtomic_cmd =  "python2.7 %s -c %s -1 %s -2 %s -s %s"\
                    %(cfgDict[str.lower('qtAtomic')], options.config, readPair[0], readPair[1], sampleName)
        logFH.write(QTAtomic_cmd + '\n')
        executeCommand(QTAtomic_cmd, cwd = os.getcwd(), verbose = False)

    pipeline_printout(logFH, [])
    pipeline_run([], verbose = 8, multiprocess = 5, logger = ruffusLoggerProxy)



I am getting the following error:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/bioinformatics/sgi/asmopt/lib/python2.7/threading.py", line 808, in __bootstrap_inner
    self.run()
  File "/bioinformatics/sgi/asmopt/lib/python2.7/threading.py", line 761, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/bioinformatics/sgi/asmopt/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed


But when I run the following, my code runs sequentially:
    @jobs_limit(8)
    @files(makeRuffusParams(sampleToPairsDict))
    def runConcurrent(readPair, rdp, sampleName):
        QTAtomic_cmd =  "python2.7 %s -c %s -1 %s -2 %s -s %s"\
                    %(cfgDict[str.lower('qtAtomic')], options.config, readPair[0], readPair[1], sampleName)
        logFH.write(QTAtomic_cmd + '\n')
        executeCommand(QTAtomic_cmd, cwd = os.getcwd(), verbose = False)

    pipeline_printout(logFH, [])
    pipeline_run([], verbose = 8,  logger = ruffusLoggerProxy)

Any ideas please?

Leo Goodstadt

unread,
Mar 18, 2014, 2:49:22 PM3/18/14
to ruffus_...@googlegroups.com
What happens when you use the multiprocessing python library is that bits of python code get packaged up using "pickle" and sent across process boundaries to run on another processor. 

This only happens for pipeline_run(multiprocess > 1,...). I.e. if you use multiprocess= 1 or multithread=NNN, then no pickling or marshalling is needed.

Some of the variables or code you are using in your function are not picklable.

I have updated the Ruffus FAQ to discuss precisely this:


but I would recommend you read the whole section on passing data between processes:


Leo




--
You received this message because you are subscribed to the Google Groups "ruffus_discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ruffus_discus...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Daniel

unread,
Mar 19, 2014, 3:05:45 PM3/19/14
to ruffus_...@googlegroups.com, llewgo...@gmail.com
Fair enough but the latest version of Ruffus seems to change the behavior regarding multithreading.
Here's a simple example:

    myData = [ [ ['/bioscratch/data/ngs_pipelines/AG0075A08_R1.fastq.gz', '/bioscratch/data/ngs_pipelines/AG0075A08_R2.fastq.gz'], 'testA.txt', 'sample1'], \
                [ ['/bioscratch/data/ngs_pipelines/XYZ_R1.fastq.gz', '/bioscratch/data/ngs_pipelines/XYZ_R2.fastq.gz'], 'testB.txt', 'sample2'] ]
   
    @files(myData)
    @jobs_limit(3, "threesome")
    def runConcurrent(readPair, rdp, sampleName):
        print readPair[0] + "\t" + readPair[1]
        gunzipTestCmd = "gunzip -c %s %s > %s"%( readPair[0], readPair[1], rdp )
        executeCommand(gunzipTestCmd, cwd = os.getcwd(), verbose = False)

    pipeline_printout(logFH, [])
    pipeline_run([], verbose = 8)

This only runs serially. adding a 'multiprocess' param to the pipeline_run crashes it with the pickling error.
I also understand that @files is deprecated but it seems its the only decorator that can handle precomputed output file names

How should I restructure this simple example to make it run in parallel?

Daniel

unread,
Mar 19, 2014, 7:14:06 PM3/19/14
to ruffus_...@googlegroups.com, llewgo...@gmail.com
Here's another example that doesn't work in parallel and executes serially:

    myFwdReads = ['/bioscratch/data/ngs_pipelines/AG0075A08_R1.fastq.gz', '/bioscratch/data/ngs_pipelines/XYZ_R1.fastq.gz']
  
    @collate(myFwdReads, regex(".+/(.+)_R1.fastq.gz$"), r"\1.rdp")
    @jobs_limit(3, "threesome")
    def runConcurrent(readPair, rdp):

        gunzipTestCmd = "gunzip -c %s %s > %s"%( readPair[0], readPair[1], rdp )
        runCommand(gunzipTestCmd)


    pipeline_run([], verbose = 8)

I'm stumped..

Daniel

unread,
Mar 19, 2014, 9:18:06 PM3/19/14
to ruffus_...@googlegroups.com, llewgo...@gmail.com
Got it!!!
My examples are nested within a main function. If I move everything out of the main function and in the body of the script then it works just as advertised.
I should not have tried to make my coding more compliant with convention...

Mark Schultz

unread,
Feb 20, 2017, 3:53:28 AM2/20/17
to ruffus_discuss, llewgo...@gmail.com
This worked for me too!  I had a feeling that having everything inside a main() function was causing some trouble.  After reading this post, and moving everything out of the main function, I no longer receive a pickling error.  Brilliant.  
Reply all
Reply to author
Forward
0 new messages