Kafka Streams Sessionization Window (List Aggregation)

135 views
Skip to first unread message

Steve Crawford

unread,
Nov 13, 2017, 1:54:53 PM11/13/17
to Confluent Platform
I'm trying to sessionize my records in Kafka using the built in windowing functionality.  I'd like to retain the original records, but apply a session id to each record. My current implementation works, but fails when there are too many records in a single session.

 Input Stream -> 
 SessionWindowedKStream ->
 Aggregation (Combines each record into an ArrayList) ->
 FlatMap (Pulls records back out from the ArrayList)

When there are many records in the session, the ArrayList grows too large and breaks Kafka throwing an exception (org.apache.kafka.common.errors.RecordTooLargeException)

I feel like I'm misusing the build in windowing functionality and should approach this differently. Can anyone offer any advice?


Matthias J. Sax

unread,
Nov 13, 2017, 2:00:40 PM11/13/17
to confluent...@googlegroups.com
If you build a list of records, this list will be written to Kafka as a
single record/message.

If you know an upper size limit, you can change the broker and producer
config to increase the maximum size of a record. This might be the
easiest way to fix your problem.

If it grows potentially unbounded, you would need to implement a custom
operator --- DSL would most likely not work anymore.

Hope this helps.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/3749cffd-d43f-4371-83a7-cd6f3fa62fb7%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/3749cffd-d43f-4371-83a7-cd6f3fa62fb7%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Steve Crawford

unread,
Nov 13, 2017, 2:05:46 PM11/13/17
to Confluent Platform
Thanks Matthias,

This is an edge case, but could potentially grow far past the standard size limit.  Is there anyway to skip these messages instead of killing the entire application?  Or anyway to get to the original rows without the aggregation methods I'm currently using?

Matthias J. Sax

unread,
Nov 13, 2017, 2:15:07 PM11/13/17
to confluent...@googlegroups.com
You cannot skip those messages... However, you could implement a session
size limit in your own code when growing the list (just add a check for
a max list size and drop some individual records on the floor)? Not sure
if this would be ok for your application.

As an alternative, you can implement a custom `.transform()` with
attaches state to build a custom operator that builds your sessions --
but this is of course quite some effort.

Don't have any better idea atm :( Maybe somebody else?


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/fb8ed66b-29cf-47d6-b70c-2678c2d9a8ee%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/fb8ed66b-29cf-47d6-b70c-2678c2d9a8ee%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Steve Crawford

unread,
Nov 16, 2017, 9:49:31 AM11/16/17
to Confluent Platform
Thanks for this!  That's a simple, but good solution.  I'll do some testing to see if it'll work in our scenario. I think it will!
Reply all
Reply to author
Forward
0 new messages