Hello,
I'm trying to build a pipeline with the following three processes:
1) submit: it chucks big input files (alignments) and passes each of them to processes executed in parall
2) analyze: it performs computation on the single chucks and sends the generated files into an output channel X
3) combine: it reads from channel X all the files generated by the analyze processes and writes some files.
What I'm not able to do is to have a single execution of the combine process. Even if I don't flatten the channel X, I still get N execution of combine, each of them receiving the set of files produced by an execution of analyze. I understand why this happens, but I don't know how to avoid this behavior.
In other words, I would like to have a single execution of submit (and I get it), N parallel executions of analyze (and I get them), and again a single execution of combine that processes all the files sent in the channel X.
I know I can subscribe to channel X and collect all the files, but then, I'm not able to have them processed into a Process. I'm only able to work on them on the submission machine (i.e. outside a process).
Here is a simplified version of the pipeline:
process submit {
echo true
output:
file 'index_*' into indexes mode flatten
"""
java -jar /Users/mas2182/Lab/TestWorkflows/goby/goby.jar -m suggest-position-slices -n 200 -o slicingPlan.tsv '${params.alignment1}' '${params.alignment2}' > /dev/null
split -l 1 slicingPlan.tsv index_
"""
}
def align_counter = 0
process analyze {
echo true
input:
set val(range), val(index) from indexes.flatMap().map { file -> tuple(extractValue(file.text), ++align_counter) }
output:
file { "${index}.aln" } into results
script:
"""
echo file received > ${index}.aln
"""
}
/*
* Wait untile the files are available and then combine them
*/
/*alignFiles = []
results.subscribe onNext: { alignFile << it}, onComplete: { println "Align results are available." }
*/
process combine {
echo true
input:
file '*.aln' from results
output:
file combined_results into done
"echo *.aln > combined_results"
}
done.subscribe {
println "Result: $it"
println "\n${it.text}"
}
Any help would be appreciated.
Thanks,
manuele