use of groupkey and groupTuple

709 views
Skip to first unread message

Edoardo Giacopuzzi

unread,
May 6, 2021, 2:09:52 PM5/6/21
to Nextflow
Hi, 
In my current configuration I have an input file containing 3 columns: sampleID, reads_F, reads_R. Each input file can contain multiple samples and multiple lines per sample like this example:

sample1,sample1_A_F.fastq.gz,sample1_A_R.fastq.gz
sample1,sample1_B_F.fastq.gz,sample1_B_R.fastq.gz
sample2,sample2_A_F.fastq.gz,sample2_A_R.fastq.gz
sample2,sample2_B_F.fastq.gz,sample2_B_R.fastq.gz

Then I create an input channel from the input file using splitCsv

fastqfiles = Channel
.from(inputfile)
.splitCsv(header: false, sep: '\t')
.map{row -> return tuple(row[0], file(row[1]), file(row[2]))}

In my current implementation I first align all the fastq couples for each sample using a process that calls bwa aligner and then I group the resulting bam files per sample using groupTuple and merge them like this:

BWA(fastqfiles, genome_data)
MERGE_BAMS(BWA.out.bam_file.groupTuple())

However, this results in MERGE_BAMS waiting until all parts from BWA have been emitted. Is there a way to have MERGE_BAMS proceed as soon as all part for a given sample are emitted?

I have a feeling I may achieve this using groupKey, but can not find a proper solution by myself.

Thanks for any suggestion!

Luc Dehaspe

unread,
May 7, 2021, 3:04:18 AM5/7/21
to next...@googlegroups.com
Hi Edoardo,

I recently had to solve exactly the same problem.The trick is to replace the first value 'sample_id' in your fastqfiles channel by value 'groupKey(sample_id, number_of_parts)'.

This can be done in two steps:
(1)  create a second channel 'fastqfiles_count'  with values 'sample_id, number_of_parts':

fastqfiles_count = Channel
.from(inputfile)
.splitCsv(header: false, sep: '\t')
.map{row -> return tuple(row[0], file(row[1])))}
.unique()
.groupTuple()
.map{sample, list_F -> [sample, groupKey(sample, list_F.size())]

(in your case the 'unique' step can probably be dropped)

(2) combine the 'fastqfiles_count'  and 'fastqfiles' channels by the 'sample_id' key and drop that key to put the groupKey in front position:

fastqfile_new = fastqfiles_count
.combine(fastqfiles, by:0) 
.map{it -> [it.get(1), it.get(2), it.get(3)] } 

Then continue as before, replacing fastqfiles by fastqfiles_new, but leaving the rest of the code as it is:

BWA(fastqfiles_new, genome_data)
MERGE_BAMS(BWA.out.bam_file.groupTuple())

I have not tested exactly the code above, but groupTuple should now cause MERGE_BAMS to start as soon as possible for every individual sample rather than wait for completion of the BWA step for all samples.

I would be happy to hear about a more elegant solution. And this could also serve as gentle reminder to the Nextflow team that a groupKey example in the documentation would indeed be very welcome....

kind regards,
--luc

--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/nextflow/15e295ff-f9d4-4435-ab59-98e664d2b656n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages