DSL2: Generate Channel after Process termination

323 views
Skip to first unread message

Barbara M.

unread,
Jan 3, 2022, 5:44:58 AM1/3/22
to Nextflow
Hello dear nextflow community.

Is there a possiblity to wait for the generation of a channel, until a process has terminated?
I tried to generate a minimal example below, however it is still a bit complicated, so let me explain what I want to achieve. Make sure that the directory $scriptpath/test extists when trying out the code.

Process1 should be the first step executed by the pipeline. In the example here it just calles the bash script testscript.sh. This script touches the files a, b, and c in test/.

After Process1, the workflow should continue with the generation of the Channel out_ch. This takes names from a list (in my real workflow it reads them from a csv) and checks whether a file test/$name exists. Since the files are created in Process1, it is critical that the Channel is created after termination of Process1. However, the channel seems to be created before, since none of the files are found (if the workflow is executed a second time, the files a,b,c are correctly found, but were obviously created in the previous execution).

Is there any possibility to wait with the channel creation, until the process has terminated? Or are you aware of any way to circumvent this problem? 

Best wishes,
Barbara


nextflow script
#!/usr/bin/env nextflow
nextflow.enable.dsl=2

Basepath = System.getProperty("user.dir")

// touches files $Basepath/test/a,  $Basepath/test/b, $Basepath/test/c
process Process1 {

    output:
        val start

    script:
        start = System.currentTimeMillis()
        print("starting process 1")
        """
        bash $Basepath/testscript.sh -p $Basepath
        """
}

// prints if channel passed checks and was created with Process1
process Process2 {

    input:
        val fromProcess1
        val fromChannel

    exec:
        print("processed $fromChannel")
}


workflow {

    Process1()

    Channel
        .from(['a', 'b'], ['c', 'd'])
        .branch{
            // Check wheter all samples exist with Function1
            // and emit samples with passed tests as out_ch.passed

            failed_first: (Function1(it[0]) == 1);
                return 1

            failed_second: (Function1(it[1]) == 1);
                return 1

            passed: true
                return it
    }
    .set{ out_ch}

    Process2(Process1.out, out_ch.passed)
}

// tests wheter a file test/$value exists for the given value
def Function1(value){
    fileslist = new FileNameByRegexFinder().getFileNames("test/", value)
    if (fileslist.size() == 0){
        print("failed $value")
        return 1
    }
    return 0
}

testscript.sh
while getopts p: flag
do
    case "${flag}" in
        p) Basepath=${OPTARG};;
    esac
done

mkdir -p $Basepath/test
sleep 3
touch $Basepath/test/a
touch $Basepath/test/b
touch $Basepath/test/c

Paolo Di Tommaso

unread,
Jan 3, 2022, 12:22:55 PM1/3/22
to nextflow
The problem is malformed. `Process1` is writing outside the task work directory which is the thing number one to not do with NF.

Modify if removing the use of `$Basepath` and declare the file(s) generated testscript.sh as a proper process `output` the corresponding channel will handle the synchronization for you.


--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/nextflow/e19fe1e6-baa0-4ca4-a201-f1e68467bab8n%40googlegroups.com.

Barbara M.

unread,
Jan 4, 2022, 2:40:49 AM1/4/22
to Nextflow
Dear Mr. Di Tommaso,

Thank you very much for your valuable comment and the fast reply.

I edited the script as suggested, and show the new code below. (Please correct me, if I did not get your suggestion correctly). However the outcome is still the same. The Channel is generated before Process1 is finished. I think that Nextflow doesn't know that it should wait with the generation, since there is no reference of Process1 in the Channel generation. 
Next I tried to wrap the Channel generation in a separate process and returned it as a val channel (see code below "ProcessForChannel" and adapted workflow). In this version, the order is correct, however the channell does not seem to return the value but [DataflowGetPropertyExpression(value=null)] and the pipeline does not terminate. Is there any possibility to get the actual value back?

Thank you very much for your time
Barbara

Version 1
Process1 (edited)
process Process1 {

    publishDir Basepath

    output:
        file 'test/*'

    script:
        print("starting process 1")
        """
        mkdir -p test
        sleep 3
        touch test/a
        touch test/b
        touch test/c
        """
}



Version 2
ProcessForChannel
process ProcessForChannel {

    input:
        file fromProcess1

    output:
        val out_ch

    exec:
        Channel
            .from(['a', 'b'], ['c', 'd'])
            .branch{
                // Check wheter all samples exist with Function1
                // and emit samples with passed tests as out_ch.passed

                failed_first: (Function1(it[0]) == 1);
                    return 1

                failed_second: (Function1(it[1]) == 1);
                    return 1

                passed: true
                    return it
        }
        .set{ out_ch}
}
Workflow
workflow {

    Process1()

    ProcessForChannel(Process1.out)

    Process2(Process1.out, ProcessForChannel.out.passed)
}

Paolo Di Tommaso

unread,
Jan 6, 2022, 11:53:33 AM1/6/22
to nextflow
Hi, still this looks like some random coding. The snippet. below should *not* be wrapped into a process, also the process `ProcessForChannel`  declares an input that's it's used nowere 

```
        Channel
            .from(['a', 'b'], ['c', 'd'])
            .branch{
                // Check wheter all samples exist with Function1
                // and emit samples with passed tests as out_ch.passed

                failed_first: (Function1(it[0]) == 1);
                    return 1

                failed_second: (Function1(it[1]) == 1);
                    return 1

                passed: true
                    return it
        }
        .set{ out_ch}
```

I'll suggest to follow this tutorial before trying to write your own code 



p

Barbara M.

unread,
Jan 7, 2022, 9:37:10 AM1/7/22
to Nextflow
Dear Mr. Di Tommaso, 

thanks again for your comment. I had already gone trough the tutorial you mentioned, and some others too. But they did not cover this specific desire.

In the meantime, I managed to implement the feature with an external python script, wrapped in a process: It writes a csv file, which is then streamed into a channel.

In case you wonder what I wanted to achieve:
I'm doing a bcl to fastq conversion as first step. Afterwards variant calling should be conducted in the same pipeline. However, sometimes the demultiplexing might fail for individual samples. I would like the pipeline not to abort in this case (no Fastqs are found) but just log the failed samples. However, the pipeline should still fail for any other error.


Thank you again for the fast response and the valuable comments
Barbara

drhp...@gmail.com

unread,
Jan 10, 2022, 10:04:22 AM1/10/22
to Nextflow

Hi Barbara,

It's a little tricky trying to figure out exactly what you are trying to do but if I understand conceptually you could use a couple of different methods depending on how you are detecting whether the demultiplexing has failed for a given sample. I will point you to a code example I used for ONT data but hopefully, you will be able to see what I am doing in terms of the NF implementation. There is also a little code tthere that allows me to log the "failed" samples in a MultiQC custom content file that can later be added to the main MultiQC report.


I suspect a combination of "branch" and "filter" are what you are looking for and you may have to play with the channels a little to get things exactly as you want them.

Hope that helps!

Cheers,

Harshil

Barbara M.

unread,
Jan 11, 2022, 3:52:54 AM1/11/22
to Nextflow
Hi Harshil,

thank you very much for your answer and your time.

I indeed planned to use the "branch" option to get my failed samples. However, I wanted to generate the initial channel by reading in the sample sheet and checking for file existence. The problem with this was, that I did not find a way to wait with the channel generation until the bcl process had terminated (this was my initial question here). The main reason for this approach was that I want to offer both a full pipeline and a variant caller only pipeline. I hoped that I could generate the channel in the same way in both cases. Thanks to your answer and code snippet I realized that I need to generate the channel differently. For the full pipeline I should stream the files generated by the bcl process, for the caller only pipeline I can use my approach wiht the sample sheet.

Thanks again for your time and guidance
Barbara

drhp...@gmail.com

unread,
Jan 11, 2022, 4:10:53 AM1/11/22
to Nextflow

Hi Barbara,

No worries! Good luck!

Cheers,

Harshil

Reply all
Reply to author
Forward
0 new messages