Hi, I have a feeling that this is a problem that will have a simple fix, but I am confused on the ways to pass various inputs to processes.
I have the following example workflow where test_bed takes no inputs and creates 3 different files. The next process, test_probe takes the output of test_bed as input. However, I want test_probe to run parallelly on each file created, so there would be 3 test_probe processes. I have tried a variety of ways to to this using various operators from the docs, but I'm getting some confusing errors.
The processes and scripts called are defined as follows:
*************************************** pipelinenf ***********************************
process test_bed {
output:
path("${params.environment}.bed1")
path("${params.environment}.bed2")
path("${params.environment}.bed3")
"""
python $projectDir/test.py
"""
}
process test_probe {
input:
val(bed_file)
output:
path("${params.environment}.*.probe")
"""
python $projectDir/test_probe.py $bed_file.val
"""
}
*************************************** test.py ***********************************
for i in range(3):
with open(f"test.{i}.bed", "w+") as f:
f.write(i)
************************************ test_probe.py *********************************
import sys
i = sys.argv[1].split('/')[-1][5:-4]
with open(sys.argv[1], "r") as f:
content = f.read()
with open(f"test.{i}.probe", "w+") as f:
f.write(content + "\n" + bed)
*********************************************************************
WORKFLOW APPROACH 1
*********************************************************************
If I define the workflow as follows:
workflow {
test_bed()
test_probe(test_bed.out.flatten())
}
Then I get the following error:
Missing process or function with name 'flatten'
groovy.lang.MissingMethodException: No signature of method: nextflow.extension.OperatorEx.flatten() is applicable for argument types: (groovyx.gpars.dataflow.DataflowVariable, groovyx.gpars.dataflow.DataflowVariable...) values: [DataflowVariable(value=null), DataflowVariable(value=null), ...]
Possible solutions: flatten(groovyx.gpars.dataflow.DataflowReadChannel)
I am confused because I thought this was the exact purpose of the flatten() operator, to take a list and emit as separate channels.
*********************************************************************
WORKFLOW APPROACH 2
*********************************************************************
If I define the workflow as follows:
workflow {
test_bed()
bed_chan = Channel.fromList(test_bed.out)
test_probe(bed_chan)
}
Then the workflow to run as expected with the three files processed in parallel.
I know this is not the best solution because test_bed.out is already a channel, so in essence I'm wrapping a channel in a channel. Also in the process definition of test_probe, I have to use $bed_file.val which does not seem like good practice. Lastly, that I get a warning:
WARN: Cannot serialize context map. Cause: java.util.ConcurrentModificationException -- Resume will not work on this process
I assume the warning is because the nested channels cannot be serialized.
I know I am probably just missing something obvious because this is a pretty trivial task, thank you in advance for your help!