DSL2: using "join" to replicate deprecated "merge"?

587 views
Skip to first unread message

NihilScit

unread,
Aug 17, 2020, 6:12:40 AM8/17/20
to Nextflow
Hi,

It used to be easy to combine channels into a channel of tuples using the "merge" operator.

For example, this:

one = channel.of(1, 2, 3)
two = channel.of(4, 5, 6)
one.merge(two).view()

would output this:

[1, 4]
[2, 5]
[3, 6]

However, in DSL2, the "merge" operator is deprecated, and the migration notes say to use "join" instead. However, "join" works on channels of tuples as inputs, so I don't see how to use it to replicate the example above. Am I missing something?

Thanks,

Pete

Soumitra Pal

unread,
Aug 17, 2020, 7:01:03 AM8/17/20
to next...@googlegroups.com
Hi Pete,
I am also new and trying to learn nextflow. I was wondering if you could try the following. Note that I created tuples of size two where the first entry is some kind of a index and the second entry is the value from your example. The join happens on the index.

one = Channel.from([1, 1], [2, 2], [3, 3])
two = Channel.from([1, 4], [2, 5], [3, 6])
one.join(two).view()
--
You received this message because you are subscribed to the Google Groups "Nextflow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nextflow+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/nextflow/3ef769a0-8d11-47e8-a8a3-04ce16a68859n%40googlegroups.com.

NihilScit

unread,
Aug 17, 2020, 9:51:41 AM8/17/20
to Nextflow
Hi Soumitra, 

Thanks for the suggestion. Could you fill in the missing piece - how to convert my simple input channel of integers (1, 2, 3) into your channel of tuples ([1, 1], [2, 2], [3, 3])? 

Pete

Soumitra Pal

unread,
Aug 17, 2020, 11:10:21 AM8/17/20
to next...@googlegroups.com
Using what I could find on the net:

[1,2,3].withIndex().collect {it,i -> [i,it]}

gives:

[[0, 1], [1, 2], [2, 3]]

Thus,

one = Channel.from([1,2,3].withIndex().collect {it,i -> [i,it]})
two = Channel.from([4,5,6].withIndex().collect {it,i -> [i,it]})
one.join(two).view()

should work.

Soumitra


Message has been deleted

Soumitra Pal

unread,
Aug 17, 2020, 11:35:13 AM8/17/20
to next...@googlegroups.com
If instead of [1,2,3] you have a list L with an unknown number of elements, just use 
one = Channel.from(L.withIndex().collect {it,i -> [i,it]})
and so on.
Soumitra


On Mon, Aug 17, 2020 at 11:24 AM NihilScit <peter.joh...@gmail.com> wrote:
And if I don't know how many elements are in the channel? 

NihilScit

unread,
Aug 17, 2020, 2:26:58 PM8/17/20
to Nextflow
Hi Soumitra, 

Apologies, I deleted the message you responded to by mistake. You are right, your solution works for any length of list. However, withIndex() doesn't work on channels, so to get the solution to work, it looks like we'd have to add in conversion from list to channel and back again. And of course we need to remove the index from the output, to meet the requirements of the original question.

However, at this stage, I wonder if we could pause and take stock. My point in raising the question was really to highlight that something which was very simple in DSL1 seems to have become rather complex in DSL2. I'm a big fan of DSL2, and I'd be surprised if this were so - hence my thought that I must have missed something. Here I mean absolutely no disrespect to your approach - you've shown it's possible, and I appreciate your time spent on it - but if this is the way to do it, I certainly don't want to type it in very often. I'd want to make it re-usable, perhaps as a workflow: 

workflow myMerge{
    take:
        one
        two

    emit:
       result

    main:
        oneIndexed = Channel.from(one.withIndex().collect {it,i -> [i,it]})
        twoIndexed = Channel.from(two.withIndex().collect {it,i -> [i,it]})
        result = oneIndexed.join(twoIndexed).map{ [it[1], it[2]]}

}

 Like I say, this won't work quite yet, but before fixing it, I'd ask: do I really have to do all this to re-invoke the old merge()? This doesn't look like what the docs had in mind when suggesting using "join" instead of "merge". Isn't there a simple solution that I can just shoe-in to my workflow? 

Pete

Soumitra Pal

unread,
Aug 17, 2020, 3:02:05 PM8/17/20
to next...@googlegroups.com
Hi Pete,
I am a newbie in nextflow. I just suggested from whatever I have learned so far.
Let us see what others in the group have to say.
Regards, 
Soumitra


Soumitra Pal

unread,
Aug 18, 2020, 12:04:03 PM8/18/20
to next...@googlegroups.com
Hi Pete,

While we are awaiting the responses from advanced users and developers of nextflow, I was wondering if you would like to provide a little more details about your workflow where you need this join operation on the channels?

Before experimenting with nextflow, I was building my software pipelines using a python based Workflow Management System (WMS) named doit. I also briefly tried with Snakemake. In both Snakemake & doit, you specify inputs and output files for a task and the commands to generate the outputs from the inputs, and the WMS will ensure efficient execution of the tasks in the pipeline. Something similar to workflows in nextflow.

However,  I faced one problem in the two systems that if I needed to built the pipeline under various parameter settings, say for example different underlying algorithms, cutoff values etc., it became a tedious process to write the pipeline as it required much repetition of code. To solve this problem, I came up with an extension of doit, where each file & task has an associated parameter table (implemented using pandas DataFrame) where the columns represent the parameter names and rows represent different settings of the parameters. Once you properly associate the parameter tables in the definitions of the files and tasks, the extended WMS (which I call JUDI - Just Do It) makes sure that the pipeline is executed optimally for each setting of the parameters. I have given links to the documentation at the end of this message.

I mention this because I guess your need for join comes from the fact you need to match the common setting of some 'parameters' in the two channels. It would be great to know if it is the case.

However, I came to nextflow because the underlying WMS (doit) that I was using, does not support execution under different settings such as HPC clusters, cloud, docker, etc. The developer of doit seems to have not many resources to support these. It would also be a burden for me to implement. Moreover, why I need to reinvent the wheel when there is an excellent support for these in systems like nextflow?

In fact, I believe that the handling of parameters can be nicely implemented in nextflow if some kind of 'indexed' channels is supported. Unlike the current implementation of channels in nextflow, these indexed channels are similar to pandas series where each value of channel is associated with a set of settings of the index variables. For your example, the indexed version [[0,1], [1,2], [2,3]] of you channel with values [1,2,3], can be easily implemented using such channels. Here, there is one unnamed index (first element of the tuple). The proposed channels should have unlimited but named index variables.

If a process has more than one channel but with one or more common index variables, the nextflow executor should create a process for each setting in the join of the indexes on the common variables.

If you like this idea, could you please express your support for such an extension request?

References:
JUDI presentation: https://t.ly/2skV

Thank you,
Soumitra




Soumitra

Reply all
Reply to author
Forward
0 new messages