java.lang.ClassCastException: org.apache.kafka.streams.kstream.internals.Change cannot be cast to ..

533 views
Skip to first unread message

Davor Poldrugo

unread,
Nov 2, 2016, 10:55:38 AM11/2/16
to Confluent Platform
Hi!

I'm using Kafka 0.10.1.0 and Kafka Streams 0.10.1.0.

I'm getting this illogical ClassCastException which to me seems like a bug: http://pastebin.com/raw/M9ib6wUP

Here is my topology: http://pastebin.com/SLW81mgv

Here is the configuration when the Kafka Streams app is started: http://pastebin.com/raw/h5fGNYf4

This topology basicly does this:
  • reads from a topic "message-events" which has no key in the data - in this topic the messages are written twice, and I want to deduplicate based on the key
  • produces the same message to a new topic "message-events-keyed" - but now with a key
  • left joins the "message-events-keyed" messages topic with a topic "message-events-deduplicated" - if the left join value is <> null - sets a flag "existsDuplicate=true"
  • filters out certain messages with a given flag "existsDuplicate=true"
  • produces the rest of the messages to the same topic with which was the left join made: "message-events-deduplicated"

This exception does not occur every time, just from time to time. Do you have any idea why CachingKeyValueStore tries to forward a object of type org.apache.kafka.streams.kstream.internals.Change instead of my model type?

Thanks,
Davor Poldrugo
https://about.me/davor.poldrugo

Damian Guy

unread,
Nov 2, 2016, 12:58:53 PM11/2/16
to Confluent Platform
Hi Davor,

Thanks for all the info - i'll look into it and get back to you.

Regards,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/31854cd1-4b2a-4b31-ac7e-66f4b02a5634%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Damian Guy

unread,
Nov 2, 2016, 1:11:40 PM11/2/16
to Confluent Platform
Hi Davor,

Setting StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 should fix the issue.

If you are feeling adventurous, you could try running against: https://github.com/dguy/kafka/tree/kafka-4311 and see if this resolves the problem.

Thanks,
Damian


Davor Poldrugo

unread,
Nov 2, 2016, 1:22:31 PM11/2/16
to Confluent Platform
Ok, I'll try. Thanks.

I have also experienced the following:

http://pastebin.com/raw/Aaw5y4w9
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Davor Poldrugo

unread,
Nov 3, 2016, 12:38:59 PM11/3/16
to Confluent Platform
It works with the config you said. Thank you very much. Do I lose some performance with CACHE_MAX_BYTES_BUFFERING_CONFIG set to 0?


On Wednesday, November 2, 2016 at 6:11:40 PM UTC+1, Damian Guy wrote:
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Damian Guy

unread,
Nov 3, 2016, 12:46:32 PM11/3/16
to Confluent Platform
Hi Davor,

Latency will be better, but throughput will drop. The main difference is that you wont get any deduplication of records. If you have the time, it would be great if you could try https://github.com/dguy/kafka/tree/kafka-4311, hopefully it also fixes the problem without having to turn caching off.

Thanks,
Damian

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Davor Poldrugo

unread,
Nov 3, 2016, 1:27:38 PM11/3/16
to Confluent Platform
Hi Damian!
How do you mean "The main difference is that you wont get any deduplication of records."? You're saying that after setting CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 - my left join will always have the left joined value null?

Does it anything to to with local store syncing?

Because I have noticed that the local store is not synced in real-time while messages are produced to a leftJoin topic.
I another use case, I have added a custom processor which has access to the local store of the left join stream - which puts the new value in the local store, just before producing the value to the leftJoin topic.

Here is the processor code: http://pastebin.com/13RRqi7p

and I use it like this:

fileLookupRecordStreamForProducing.process(new ImmediatelyAddToLocalStoreProcessorSupplier(leftJoinStateStoreName), leftJoinStateStoreName);
fileLookupRecordStreamForProducing.to(Serdes.Long(), fileLookupRecordSerde, streamsAppSettings.getFilesLookupTopicName());

Do you maybe have a built version with KAFKA-4311? For example in the apache snapshots repo...

Thanks,
Davor
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Damian Guy

unread,
Nov 3, 2016, 2:26:29 PM11/3/16
to Confluent Platform
Hi Davor

On 3 November 2016 at 17:27, Davor Poldrugo <dpol...@gmail.com> wrote:
Hi Damian!
How do you mean "The main difference is that you wont get any deduplication of records."? You're saying that after setting CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 - my left join will always have the left joined value null?


It means that each record will be forwarded to the next processor in the node immediately. So you'll get an output record for every input record, which will result in there being outputs from the leftJoin that will join with null.


Does it anything to to with local store syncing?


No - i think you are expecting the KTable to be fully populated at start up? That doesn't happen at the moment. But there is a JIRA for it: https://issues.apache.org/jira/browse/KAFKA-4113 - is that what you are after?

Do you maybe have a built version with KAFKA-4311? For example in the apache snapshots repo...


No I'm sorry i don't.

Reply all
Reply to author
Forward
0 new messages