How to fork and then recombine channels

108 views
Skip to first unread message

Thomas A. Christensen II

unread,
Sep 1, 2021, 8:52:33 AM9/1/21
to next...@googlegroups.com

Hello Nextflow team,

Consider the following process

-------------     ------------     ------------------     -------------
| Raw Reads | --> | Trimming | --> | Classification | --> | Filtering | --> ...
-------------     ------------     ------------------     -------------
                       |                                         ^
                       |                                         |
                       -------------------------------------------

I have run into a lot of issues lately with one sample's output from Classification getting put together with another sample's output from Trimming.

Simplified workflow:

#!/usr/bin/env nextflow
nextflow.enable.dsl = 2

workflow {
    RawReads = Channel
        .fromFilePairs("${params.readsfolder}/*{R1,R2,_1,_2}*.{fastq,fq}.gz")

    Trimming(RawReads)
    Classification(Trimming.out)
    Filtering(Trimming.out, Classification.out)
}

and then the .command.sh file for the Filtering process will look like

#!/bin/bash
extract_kraken_reads.py -k sample_51.kraken -s sample_64_trimmed.fastq -t 28875 -o sample_64_filtered.fastq

Nearly always this process fails, and even if it does somehow manage to make output, the output is always nonsense.

This issue doesn't happen 100% of the time, and so it's really confusing me, and I'm not sure if it's a bug or my error, as I've already tried rewriting the workflow into DSL 1 syntax and using pipe notation. Am I forking/recombining the channels wrong? Is there at least a more stable way to do so?

Thanks,
Thomas Christensen
tchris...@vet.k-state.edu

Paolo Di Tommaso

unread,
Sep 1, 2021, 9:36:54 AM9/1/21
to nextflow
Looking at your command line, guess the problem is that samples get out of sync right? 

```
extract_kraken_reads.py -k sample_51.kraken -s sample_64_trimmed.fastq -t 28875 -o sample_64_filtered.fastq
```



--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/nextflow/b5124ba3-db45-52e0-63a3-3b478ddc28bb%40vet.k-state.edu.

Laurence Bernstein

unread,
Sep 1, 2021, 3:11:54 PM9/1/21
to Nextflow
Oh yes.. This is the biggest "rookie error" that I have made even as a non-rookie.
You MUST add a sample_name or other unique identifier to the RawReads channel and then pass it through to your output channels.
Then before you call Filtering() you must use join to synch the two channels and pass the new channel to Filtering()
Like this:

Trimming.out.join( Classification.out ).set { joined_channel }
Filtering(  joined_channel )

the results in joined_channel will automatically be synched on the first field of the tuple, but will eliminate the identical field in the Classification channel.
So if it looked like:

Trimming.out = [sample_name1, a1, b1],  [sample_name2, a2, b2] ...
Classification  .out = [sample_name1, c1, d1],  [sample_name2, c2, d2] ...

The joined channel will look like:

joined_channel = [sample_name1, a1, b1, c1, d1],  [sample_name2, a2, b2, c2, d2] ...

Hope that explains it.

LB

Thomas A. Christensen II

unread,
Sep 1, 2021, 5:20:34 PM9/1/21
to next...@googlegroups.com

That makes sense. Thanks, Laurence. After a quick look at the join operator documentation and some refactoring, the processes are working as expected. I would have expected a trick like this to show up on the Nextflow Patterns page: maybe I'll write a guide and submit a PR there.

Thomas

Reply all
Reply to author
Forward
0 new messages