I'm trying to sessionize my records in Kafka using the built in windowing functionality. I'd like to retain the original records, but apply a session id to each record. My current implementation works, but fails when there are too many records in a single session.
Input Stream ->
SessionWindowedKStream ->
Aggregation (Combines each record into an ArrayList) ->
FlatMap (Pulls records back out from the ArrayList)
When there are many records in the session, the ArrayList grows too large and breaks Kafka throwing an exception (org.apache.kafka.common.errors.RecordTooLargeException)
I feel like I'm misusing the build in windowing functionality and should approach this differently. Can anyone offer any advice?