NK 0.2.x outer-join feature

7 views
Skip to first unread message

mathieu...@diginext.fr

unread,
Sep 25, 2020, 9:23:50 AM9/25/20
to Nussknacker
Hi all,

I'm working with NK v. 0.2.2

I've got some issues using the new outer-join node.

Maybe am I missing something aboit its behavior. I would expect it to provide me data from the two feeding streams.

Let's have an example, and suppose I work with the two following streams :

Stream Foo:
providing in its context objects with structure 
{
   id: String
   barId: String
   ...
} as #inputFoo

Stream Bar:
providing in its context objects with structure 
{
   id: String
   ...
} as #inputBar

I tried the following configuration for my outer-join node :
output: joinResult
branch type: Stream Foo : MAIN, Stream Bar JOINED
key Stream Foo : #inputFoo.barId, Stream Bar #inputBar.id
aggregator: List
windowLength: 2min
aggregateBy: #inputBar.id


Unfortunately, after the outer-join step 
1) I've got no matching operation based on the supplied keys. Plenty of events are produced in output of my outer-join mode
2) my #joinResult list (or set, if I use the related aggregator) is allways empty


Could you please explain me the use of this node (maybe with an example) ? 

Thanks a lot.

Best regards,
--

Arek Burdach

unread,
Sep 25, 2020, 3:45:05 PM9/25/20
to mathieu...@diginext.fr, Nussknacker
Hi

Your usage of outer-join is correct. Maybe there is some time shift between both streams? Can you try to add delay node in MAIN stream branch and increase windowLength?

Unfortunately our implementation is just a simple Flink's connect usage:
        val statefulStream = keyedMainBranchStream
          .connect(keyedJoinedStream)
          .keyBy(v => v.value, v => v.value.key)
          .process(aggregatorFunction)
we don't have any buffering under the hood.

For this reason in our test (OuterJoinTransformerSpec) there is eventually:
    input1.add(OneRecord(key, 0, -1))
    // We can't be sure that main records will be consumed after matching joined records so we need to wait for them.
    eventually {
      OuterJoinTransformerSpec.elementsAddedToState should have size input2.size
    }
    input1.add(OneRecord(key, 2, -1))
    input1.finish()

Regards to this:

Plenty of events are produced in output of my outer-join mode
Strange, it should be produced exactly as many events as incoming events from MAIN streams. Do you see some errors on grafana or in logs?

Arek
--
You received this message because you are subscribed to the Google Groups "Nussknacker" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nussknacker...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/nussknacker/cd708fb3-be62-4d8c-b898-a71c9c8ef79bn%40googlegroups.com.

mathieu...@diginext.fr

unread,
Sep 28, 2020, 11:26:44 AM9/28/20
to Nussknacker
Hi Arek

First of all, again, thank you for your help :)

Today I checked the number of events. To end with this point, you're right. The produced events are exacltly the number of events incoming from MAIN stream. I didn't realized that outer-joins behaved like AggregateSliding (which is clearly written on the doc). Appologies :)

About the outer-join behavior, I also made some tests. I added the delay node to my process on the main stream, in order to process events from main stream after the ones from joined stream. And I've got the same behavior (produced events are always empty, even though events that should match the criteria were processed between the windowLength submitted (5 minutes). Maybe is it a point about this window length ? I don't see it mentionned in the code fragment you showed me in your response (writing this mail makes me want to make other tests, with way longer winbdowLength).

By the way, I've found a gui bug when building my processes. Do you want me to create directly an issue on the github or do you rather want me to describe it on another thread of the google group ?

Best regards.

Arkadiusz Burdach

unread,
Sep 28, 2020, 12:05:26 PM9/28/20
to mathieu...@diginext.fr, Nussknacker


pon., 28 wrz 2020, 17:26 użytkownik 'mathieu...@diginext.fr' via Nussknacker <nussk...@googlegroups.com> napisał:
Hi Arek

First of all, again, thank you for your help :)

Today I checked the number of events. To end with this point, you're right. The produced events are exacltly the number of events incoming from MAIN stream. I didn't realized that outer-joins behaved like AggregateSliding (which is clearly written on the doc). Appologies :)

About the outer-join behavior, I also made some tests. I added the delay node to my process on the main stream, in order to process events from main stream after the ones from joined stream. And I've got the same behavior (produced events are always empty, even though events that should match the criteria were processed between the windowLength submitted (5 minutes). Maybe is it a point about this window length ? I don't see it mentionned in the code fragment you showed me in your response (writing this mail makes me want to make other tests, with way longer winbdowLength).
It might by that. Please try with longer window and give me a call if it helped.

Also what timestamp assingment starategy do you have on sources?


By the way, I've found a gui bug when building my processes. Do you want me to create directly an issue on the github or do you rather want me to describe it on another thread of the google group ?
Please submit an issue on GH. Thanks in advance!

mathieu...@diginext.fr

unread,
Oct 8, 2020, 5:39:52 AM10/8/20
to Nussknacker
Hi Arek,

First of all, please allow me to appologize for the time I took to give you an update on this topic. I had some days off and hat also other subjects to deal with.

Your question about the timestamp assignment strategy put me on the right track to solve the issue. In fact, my implementation of the AssignerWithPeriodicWatermarks was depending on the Timezone of my TaskManager. And actually, there was a difference between the timezone of my event producer and my taskmanager...

Since i fixed this issue, I no longer have any problem with the outer-join.

About the IHM bug, I'm going to submit the GitHub issue.

Thanks a lot for your help, and best regards.
--
Reply all
Reply to author
Forward
0 new messages