org.apache.kafka.common.errors.RecordTooLargeException

304 views
Skip to first unread message

Vinu Barro

unread,
Apr 27, 2017, 9:01:02 PM4/27/17
to Confluent Platform
Just seeing anyone can throw some light on this problem

I'm doing some Streams aggregation and writing aggregated records to the topic and getting the following errors. I'm using custom json serde for the aggregation helper class. The solution I found on some blogs to this problem is to increase the max.request.size. Though I increase the max.request size to from default to 401391899, the serialized aggregation message size keeps increasing on the subsequent writes to the topic. Below are subset of messages are written to the topic. Running the streams after 10 mins, the below error shows up. Not sure if the problem is with my serde or should I change any config settings other than max.request.size to overcome this problem.  Any suggestions appreciated.

Message written to topic;

{A=5, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=6, B=1, C=0, D=87, E=1, F=0.4482758620689655 }
{A=7, B=1, C=2, D=87, E=1, F=0.4482758620689655 }




Error:

org.apache.kafka.common.errors.RecordTooLargeException: The message is 238415842 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

Exception in thread "StreamThread-1" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at com.google.gson.stream.JsonReader.nextString(JsonReader.java:1043)
at com.google.gson.stream.JsonReader.nextValue(JsonReader.java:784)
at com.google.gson.stream.JsonReader.nextInArray(JsonReader.java:693)
at com.google.gson.stream.JsonReader.peek(JsonReader.java:376)
at com.google.gson.stream.JsonReader.hasNext(JsonReader.java:349)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:80)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:93)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:172)
at com.google.gson.Gson.fromJson(Gson.java:795)
at com.google.gson.Gson.fromJson(Gson.java:761)
at com.google.gson.Gson.fromJson(Gson.java:710)
at com.google.gson.Gson.fromJson(Gson.java:682)
at com.data.agg.sensorstreams.JsonDeserializer.deserialize(JsonDeserializer.java:34)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:156)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:103)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$200(CachingWindowStore.java:34)
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:86)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:131)
at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:222)
at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:205)
at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:120)
at org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:149)
at org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:112)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
[Thread-2] INFO org.apache.kafka.streams.KafkaStreams - Stopped Kafka Stream process
Reply all
Reply to author
Forward
0 new messages