Stream deduping

196 views
Skip to first unread message

Shiva Ramagopal

unread,
Jun 23, 2017, 12:42:21 PM6/23/17
to akka...@googlegroups.com
Hi,

I'm looking for the latest and greatest techniques and thoughts in stream deduplication and would love to know if anyone here has done this at scale. Specifically, I'm looking for deduping that also handles late-arriving messages.

In the past few days of my search, I've mostly come across ideas and implementations like

- Batching the stream based on time windows (non-overlapping) and deduping within the batch
- Possible improvements on the above technique using overlaping time windows
- HDFS-specific cases where a stream is consumed (pretty batchy), written to HDFS and deduped there

My source is Kafka, if that helps.

Thanks
Shiv

Anil Gursel

unread,
Jun 23, 2017, 1:23:45 PM6/23/17
to akka...@googlegroups.com
It is not latest and greatest; however, here is an Akka Streams GraphStage implementation for deduplication: https://squbs.readthedocs.io/en/latest/deduplicate/.  All happens in memory, so you need to watch for memory growing and potentially pass a custom registry that self cleans after a while.  Source code is at https://github.com/paypal/squbs/blob/master/squbs-ext/src/main/scala/org/squbs/streams/Deduplicate.scala.

Thanks,
Anil

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Michal Borowiecki

unread,
Jun 24, 2017, 11:34:11 AM6/24/17
to akka...@googlegroups.com, Shiva Ramagopal

I drafted an implementation outline in kafka-streams to address the problem of sliding-window reordering (to cater for late messages within the time window), it also caters for de-duplication:

https://stackoverflow.com/questions/43939534/apache-kafka-order-windowed-messages-based-on-their-value/44345374#44345374

You can implement something similar in akka-streams I believe.

First thing that comes to mind is to sink messages into a sorted map (keyed by event-time timestamp and msg key pair) and then a new periodic source picks them up - and connect the two with a Flow.fromSinkAndSource. You'll need to take care of offset commits - after the windowing de-duplication stage, i.e. on restart you don't want to lose the messages buffered in the map.

Looking forward to ideas how to do this better.

Cheers,

Michał

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
Michal Borowiecki
Senior Software Engineer L4
T: +44 208 742 1600


+44 203 249 8448


 
E: michal.b...@openbet.com
W: www.openbet.com
OpenBet Ltd

Chiswick Park Building 9

566 Chiswick High Rd

London

W4 5XT

UK

This message is confidential and intended only for the addressee. If you have received this message in error, please immediately notify the postm...@openbet.com and delete it from your system as well as any copies. The content of e-mails as well as traffic data may be monitored by OpenBet for employment and security purposes. To protect the environment please do not print this e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company registered in England and Wales. Registered no. 3134634. VAT no. GB927523612

Shiva Ramagopal

unread,
Jul 10, 2017, 11:01:12 AM7/10/17
to Michal Borowiecki, akka...@googlegroups.com
Hi,

Thanks for the answers!

Michal,

Your approach seems most appropriate for my case as it dedups *and* handles late records. Your point on losing messages in the map upon restart after a failure, is very valid. One way of handling this is to have checkpoints at window-level. 

Roughly speaking, if my dedup window is some X seconds to allow for late-arriving messages that are utmost X seconds late, the map will have two windows - the current or Nth window and the previous or (N-1)th window, and possibly also a (N-2)th window to not miss messages that arrive exactly X seconds late. A periodic source could read the (N-2)th window's contents and write them to the sink and checkpoint the corresponding Kafka offsets and the corresponding timestamps. Now, if the process fails, the lost messages are those in the Nth and (N-1)th windows.

When restarting after a failure, the process starts reading from the offset corresponding to the (N-1)th window. I should be testing carefully, but I guess this could work.

Thanks again!
-Shiv 

On Sat, Jun 24, 2017 at 9:03 PM, Michal Borowiecki <michal.b...@openbet.com> wrote:

I drafted an implementation outline in kafka-streams to address the problem of sliding-window reordering (to cater for late messages within the time window), it also caters for de-duplication:

https://stackoverflow.com/questions/43939534/apache-kafka-order-windowed-messages-based-on-their-value/44345374#44345374

You can implement something similar in akka-streams I believe.

First thing that comes to mind is to sink messages into a sorted map (keyed by event-time timestamp and msg key pair) and then a new periodic source picks them up - and connect the two with a Flow.fromSinkAndSource. You'll need to take care of offset commits - after the windowing de-duplication stage, i.e. on restart you don't want to lose the messages buffered in the map.

Looking forward to ideas how to do this better.

Cheers,

Michał


On 23/06/17 17:42, Shiva Ramagopal wrote:
Hi,

I'm looking for the latest and greatest techniques and thoughts in stream deduplication and would love to know if anyone here has done this at scale. Specifically, I'm looking for deduping that also handles late-arriving messages.

In the past few days of my search, I've mostly come across ideas and implementations like

- Batching the stream based on time windows (non-overlapping) and deduping within the batch
- Possible improvements on the above technique using overlaping time windows
- HDFS-specific cases where a stream is consumed (pretty batchy), written to HDFS and deduped there

My source is Kafka, if that helps.

Thanks
Shiv
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages