How to deal with channel that emits one value and a variable multiple number of file objects

2,080 views
Skip to first unread message

Owen S.

unread,
Apr 12, 2017, 7:36:01 PM4/12/17
to Nextflow
I have three processes which each output a value and a file.  My intention is to use the value (subject_id) to group the results in the final output.  Each process may be run multiple times per subject_id, so I don't know in advance how many output files there will be.

For the sake of brevity, here's pseudo code for the three processes:


process process1 {
    output:
    set subject_id, file("process1.json) into p1channel
}

process process2 {
    output:
    set subject_id, file("process2.json) into p2channel
}

process process3 {
    output:
    set subject_id, file("process3.json) into p3channel
}


I've discovered that I can nicely combine these set outputs by using the new combine operator.  This is very nice!  Nextflow just keeps getting better.


synthesis_channel = p1channel.combine(p2channel, by:0).combine(p3channel, by:0)


If I print the channel, it looks good!

synthesis_channel.println()

[TS_12345678, /path/to/work/25/2d20389c4334d8a4250ec5aaf84f38/process1.json, /path/to/work/e5/964d7d5a2a6f4850de67e3705f4f66/process2.json, /path/to/work/99/e5bddbbcb251e680c5527d4d63e8b2/process3.json]
[TS_44444444, /path/to/work/27/2a7f44f508b4b7a8647d247b4b199a/process1.json, /path/to/work/16/ec3e05d9149f4aeeac4809a4114c39/process2.json, /path/to/work/a4/465dd587ea7abfe86fcc47cd1d652e/process3.json]


But when I try to access the list of files in the following way, it only grabs the first file in the list: 

process synthesis {
    input:
    set subject_id, file(results:'results_*.json') from synthesis_channel
}


I think I want my synthesis_channel to emit [idval, [file1, file2, file3, ..., fileN]] instead of [idval, file1, file2, file3, ..., fileN]

How can I do this?
Thanks!

Owen S.

unread,
Apr 13, 2017, 12:26:26 AM4/13/17
to Nextflow
I am happy to be able to answer my own question:

The solution I found was to chain the map operator after the combine, and then in the closure, define the list structure like I want it to be.

synthesis_channel = p1channel.combine(p2channel, by:0).combine(p3channel, by:0).map { [it[0], it[1..-1]]}

Then, in the process, I consume that channel as shown in the example.

I'm not sure if my map closure is the best way to accomplish my goal, but it seems to work.

Paolo Di Tommaso

unread,
Apr 13, 2017, 5:46:20 AM4/13/17
to nextflow
Hi Owen,

Yes you can make that. Alternatively you can define all the three files one by one, eg.:

process synthesis {
    input:
    set subject_id, file(json1), file(json2), file(json3)from synthesis_channel
   : 
}


I would go for the latter. Also take note that this works as long as the `subject_id` is not repeated by the same channel, otherwise you will get a cartesian product of the entries having the same id. 


Cheers,
Paolo
 

--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/nextflow.
For more options, visit https://groups.google.com/d/optout.

Owen S.

unread,
Apr 13, 2017, 11:56:37 AM4/13/17
to Nextflow
Hi Paolo,

I was overlooking the cartesian product part of combine documentation.  The number of samples per subjectID in my workflow may be multiple and variable.  So combine isn't the solution.  What is the non-cartesian way to group these output files by common key?  It seems like groupTuple might be what I want, but it seems like it doesn't want to work when the second object in the tuple is a file object.  I see this error when I try to replace combine() with groupTuple()

ERROR ~ No signature of method: groovyx.gpars.dataflow.DataflowQueue.groupTuple() is applicable for argument types

I appreciate your suggestion (to enumerate the files explicitly) but as you can see, this won't work for me because each of the three processes will be processing one to many samples.

I am still stuck  --  any suggestion for non-cartesian grouping of (key, file) tuples?

Thanks again
Owen


On Thursday, April 13, 2017 at 2:46:20 AM UTC-7, Paolo Di Tommaso wrote:
Hi Owen,

Yes you can make that. Alternatively you can define all the three files one by one, eg.:

process synthesis {
    input:
    set subject_id, file(json1), file(json2), file(json3)from synthesis_channel
   : 
}


I would go for the latter. Also take note that this works as long as the `subject_id` is not repeated by the same channel, otherwise you will get a cartesian product of the entries having the same id. 


Cheers,
Paolo
 
On Thu, Apr 13, 2017 at 6:26 AM, Owen S. <owen.s...@gmail.com> wrote:
I am happy to be able to answer my own question:

The solution I found was to chain the map operator after the combine, and then in the closure, define the list structure like I want it to be.

synthesis_channel = p1channel.combine(p2channel, by:0).combine(p3channel, by:0).map { [it[0], it[1..-1]]}

Then, in the process, I consume that channel as shown in the example.

I'm not sure if my map closure is the best way to accomplish my goal, but it seems to work.

--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.

Paolo Di Tommaso

unread,
Apr 13, 2017, 12:29:13 PM4/13/17
to nextflow
I think you should be able to mix them and apply a groupTuple on the result. It should do the trick. 


p

To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+unsubscribe@googlegroups.com.

Owen S.

unread,
Apr 13, 2017, 12:56:01 PM4/13/17
to Nextflow
YES!  thank you that was exactly what I was looking for.  It makes sense now that I see what it is doing.

Thank you very much indeed.

Owen
Reply all
Reply to author
Forward
0 new messages