Forking list of outputs to separate channels

541 views
Skip to first unread message

Emma Munch

unread,
Jul 7, 2021, 5:44:47 PM7/7/21
to Nextflow
Hi, I have a feeling that this is a problem that will have a simple fix, but I am confused on the ways to pass various inputs to processes.

I have the following example workflow where test_bed takes no inputs and creates 3 different files. The next process, test_probe takes the output of test_bed as input. However, I want test_probe to run parallelly on each file created, so there would be 3 test_probe processes. I have tried a variety of ways to to this using various operators from the docs, but I'm getting some confusing errors.

The processes and scripts called are defined as follows:
***************************************   pipelinenf   ***********************************
process test_bed {
output:
path("${params.environment}.bed1")
path("${params.environment}.bed2")
path("${params.environment}.bed3")

"""
python $projectDir/test.py
"""
}

process test_probe {
input:
val(bed_file)

output:
path("${params.environment}.*.probe")

"""
python $projectDir/test_probe.py $bed_file.val
"""
}
***************************************   test.py   ***********************************
for i in range(3):
   with open(f"test.{i}.bed", "w+") as f:
      f.write(i)
************************************   test_probe.py   *********************************
import sys
i = sys.argv[1].split('/')[-1][5:-4]
with open(sys.argv[1], "r") as f:
    content = f.read()
with open(f"test.{i}.probe", "w+") as f:
    f.write(content + "\n" + bed)


*********************************************************************
WORKFLOW APPROACH 1
*********************************************************************
If I define the workflow as follows:
workflow {
   test_bed()
   test_probe(test_bed.out.flatten())
}
Then I get the following error:
Missing process or function with name 'flatten'
groovy.lang.MissingMethodException: No signature of method: nextflow.extension.OperatorEx.flatten() is applicable for argument types: (groovyx.gpars.dataflow.DataflowVariable, groovyx.gpars.dataflow.DataflowVariable...) values: [DataflowVariable(value=null), DataflowVariable(value=null), ...]
Possible solutions: flatten(groovyx.gpars.dataflow.DataflowReadChannel)

I am confused because I thought this was the exact purpose of the flatten() operator, to take a list and emit as separate channels.

*********************************************************************
WORKFLOW APPROACH 2
*********************************************************************
If I define the workflow as follows:
workflow {
   test_bed()
  bed_chan = Channel.fromList(test_bed.out)
   test_probe(bed_chan)
}
Then the workflow to run as expected with the three files processed in parallel. 
I know this is not the best solution because test_bed.out is already a channel, so in essence I'm wrapping a channel in a channel. Also in the process definition of test_probe, I have to use $bed_file.val which does not seem like good practice. Lastly, that I get a warning: 
WARN: Cannot serialize context map. Cause: java.util.ConcurrentModificationException -- Resume will not work on this process
I assume the warning is because the nested channels cannot be serialized.

I know I am probably just missing something obvious because this is a pretty trivial task, thank you in advance for your help!

Emma Munch

unread,
Jul 7, 2021, 5:45:45 PM7/7/21
to Nextflow
Also I should note I'm using dsl2

Sam

unread,
Jul 8, 2021, 10:05:27 AM7/8/21
to Nextflow
You forgot () after out

workflow{
    test_bed()
    test_probe(test_bed.out().flatten())
}

Though I might prefer doing
workflow{
    test_bed() \
        | flatten \
        | test_probe
}

also, as your input is a file, you might want to use path instead of val, e.g.

process test_probe {
input:
path(bed_file)
}

Emma Munch

unread,
Jul 8, 2021, 12:39:12 PM7/8/21
to Nextflow
Hmm, I don't think thats the problem because nowhere in the docs does it specify to use '()' when getting the outputs. See this example from the docs:
workflow my_pipeline { 
    foo() bar( foo.out.collect() )
}
I would assume that flatten() should work the same way.
I did try this however and the error I got seems to support the information in the docs:
Missing process or function with name 'out'

Secondly,  the solution you provided with pipes yields the same Missing process or function with name 'flatten' for me.

Sam

unread,
Jul 8, 2021, 12:50:57 PM7/8/21
to Nextflow
Yup, I was wrong, there shouldn't be () after out

What is your nextflow version btw?

Emma Munch

unread,
Jul 8, 2021, 12:54:41 PM7/8/21
to Nextflow
21.04.1

Sam

unread,
Jul 8, 2021, 2:20:56 PM7/8/21
to Nextflow

ok, figured it out.

The problem is that you are outputing to three different unnamed channel. To illustrate the problem, let's use emit:



process test_bed{
    output:
        path "a", emit: a
        path "b", emit: b
        path "c", emit: c
    script:
    """
    touch a b c
    """
}

process test_probe{
    input:
        path(in)
    script:
        """
        echo ${in}
        """
}
workflow{
    test_bed() \
        | view
}


This will return similar error saying view not found

If we change the workflow to

workflow{
    test_bed()
    test_bed().out.a \
        | view
}

This will work alright

What you want in this situation is `mix`

workflow{
   test_bed() \
       | mix \
      | view
}

which should give you the expected result

If you want to use flatten, you need to change your process to


process test_bed{
    output:
       tuple   path ("a"),
                    path ("b"),
                    path ("c")
    script:
    """
    touch a b c
    """
}

Emma Munch

unread,
Jul 8, 2021, 5:33:54 PM7/8/21
to Nextflow
Great! Thank you so much for responding so quickly and all your help!!
Reply all
Reply to author
Forward
0 new messages