downstream input channel from multiple processes

562 views
Skip to first unread message

Manuele Simi

unread,
May 30, 2015, 2:30:11 PM5/30/15
to next...@googlegroups.com
Hello,

I'm trying to build a pipeline with the following three processes:
1) submit: it chucks big input files (alignments) and passes each of them to processes executed in parall
2) analyze: it performs computation on the single chucks and sends the generated files into an output channel X
3) combine: it reads from channel X all the files generated by the analyze processes and writes some files.

What I'm not able to do is to have a single execution of the combine process. Even if I don't flatten the channel X, I still get N execution of combine, each of them receiving the set of files produced by an execution of analyze. I understand why this happens, but I don't know how to avoid this behavior.

In other words, I would like to have a single execution of submit (and I get it), N parallel executions of analyze (and I get them), and again a single execution of combine that processes all the files sent in the channel X.

I know I can subscribe to channel X and collect all the files, but then, I'm not able to have them processed into a Process. I'm only able to work on them on the submission machine (i.e. outside a process).

Here is a simplified version of the pipeline:

process submit {
	echo true 

        output:
        file 'index_*' into indexes mode flatten 

        """
        java -jar /Users/mas2182/Lab/TestWorkflows/goby/goby.jar -m suggest-position-slices  -n 200 -o slicingPlan.tsv '${params.alignment1}' '${params.alignment2}' > /dev/null
	split -l 1 slicingPlan.tsv index_
        """
}


def align_counter = 0

process analyze {
        echo true

	input:
        set val(range), val(index) from indexes.flatMap().map { file -> tuple(extractValue(file.text), ++align_counter) }
	
	output:
	file { "${index}.aln" }  into results

	script:	
	"""
        echo file received > ${index}.aln
	"""
}
/*
 * Wait untile the files are available and then combine them
 */
/*
alignFiles = []
results.subscribe onNext: { alignFile << it},  onComplete: { println "Align results are available." }
*/

process combine {
	echo true

	input:
	file '*.aln' from results

	output:
	file combined_results into done
		
	"echo *.aln > combined_results" 
}

done.subscribe {
    println "Result: $it"
    println "\n${it.text}"
}


Any help would be appreciated.

Thanks,
manuele

Paolo Di Tommaso

unread,
May 31, 2015, 6:23:24 AM5/31/15
to nextflow
This happens because the "analyze" process is executed n times, thus the "results" output triggers the execution of combine n times.  

What may be confusing at the beginning is to understand how process executions are triggered. They do not depend on the number of files produced but on the number of "items" emitted by a channel. 

In you first example you had the "submit" process producing many files output as a single "item", so triggering the following process just one time. The "flatMap" operator converts the many files item to a many emissions of a single files. Thus you get the "analyse" process executed as many times are there are index files. 

Now you need to the opposite. Collect all the "results" items into a single item emission, so that is executed once. This can be done with the "toList" operator 


You combine operator should look like this:


process combine { echo true input: file '*.aln' from results.toList() output: file combined_results into done "echo *.aln > combined_results" }


However if you are processing a large number of items and the "combine" process is only required to merge your results into a single file, I would suggest to take in consideration to replace it with the "collectFile" operator which is specialised for this job.


Let me know if the above solution works for you.

Cheers,
Paolo




--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.
Visit this group at http://groups.google.com/group/nextflow.
For more options, visit https://groups.google.com/d/optout.

Manuele Simi

unread,
Jun 1, 2015, 10:01:39 AM6/1/15
to next...@googlegroups.com
Hi Paolo,

my understanding of the situation was exactly what you explained (N executions of analyze trigger N executions of combine). Thanks for confirming it.

Since I need to  do some processing on the input files in combine, I will go for the toList option.

Thanks again,
manuele

Paolo Di Tommaso

unread,
Jun 1, 2015, 10:04:03 AM6/1/15
to nextflow
Happy to hear that. 

You are welcome. 


Cheers,
Paolo
Reply all
Reply to author
Forward
0 new messages