Kafka Streams: ClassCastException: java.lang.String cannot be cast to kafka.streams.KeyValue

1,178 views
Skip to first unread message

Anuj Choudhury

unread,
Jul 3, 2018, 3:20:17 PM7/3/18
to Confluent Platform
I cannot resolve this error:
I have a Kafka Streams app, where the KTable is of the form <String,String>.
I want to group the Ktable by the values, which can only be of the form "s","p" or "f".
Suppose the KTable is aq.

I do something of this sort:-
        KTable aggregatedStream = aq.groupBy((key, value)->value).count();
                aggregatedStream.toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

What is the problem with this approach, and why is it not working? The error it is giving is that :-
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.KeyValue

I cant really understand it.

John Roesler

unread,
Jul 3, 2018, 3:51:49 PM7/3/18
to confluent...@googlegroups.com
Hello Anuj,

At first glance, I think it's because groupBy expects a KeyValueMapper that returns a KeyValue. So maybe what you needed was:

KTable aggregatedStream = aq.groupBy((key, value)-> new KeyValue(value, 1)).count();
(since you're just counting, I put in a dummy "1" as the value.)

Does that work?
-John


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b65d80cb-373b-4e9a-bea1-fff7000bd047%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Anuj Choudhury

unread,
Jul 4, 2018, 8:05:25 AM7/4/18
to Confluent Platform
 Hi John!
 It worked.
 I also had to use a SerDes for changing the type from <String,String> to <String,Long> but that was done:
 
 KTable aggregatedstream=aq.groupBy((key, value)-> new KeyValue(value, 1L), Serialized.with(Serdes.String(),Serdes.Long())).count();

Thanks a lot for the prompt response!
-Anuj

On Wednesday, July 4, 2018 at 1:21:49 AM UTC+5:30, John Roesler wrote:
Hello Anuj,

At first glance, I think it's because groupBy expects a KeyValueMapper that returns a KeyValue. So maybe what you needed was:

KTable aggregatedStream = aq.groupBy((key, value)-> new KeyValue(value, 1)).count();
(since you're just counting, I put in a dummy "1" as the value.)

Does that work?
-John


On Tue, Jul 3, 2018 at 2:20 PM Anuj Choudhury <anuj.cho...@gmail.com> wrote:
I cannot resolve this error:
I have a Kafka Streams app, where the KTable is of the form <String,String>.
I want to group the Ktable by the values, which can only be of the form "s","p" or "f".
Suppose the KTable is aq.

I do something of this sort:-
        KTable aggregatedStream = aq.groupBy((key, value)->value).count();
                aggregatedStream.toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

What is the problem with this approach, and why is it not working? The error it is giving is that :-
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.streams.KeyValue

I cant really understand it.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Guozhang Wang

unread,
Jul 4, 2018, 4:58:49 PM7/4/18
to Confluent Platform
John, Anuj:

What surprises me is that you do not get a compilation error that prevents you from even starting the Java application: the API is supposed to provide strong typing so that (key, value) -> value would not be allowed even. I tried the same code with J8 on Kafka version 2.0 and I got the compilation error.

I'm wondering which Kafka version were you using?

Guozhang

Anuj Choudhury

unread,
Jul 5, 2018, 5:51:59 AM7/5/18
to Confluent Platform
 Hi Guozhang
 I am using Kafka version 1.1.0, released on March 28 according to this. I did not know later versions existed!
 I was trying to emulate the wordcount example given here . It had a groupBy((key, value) -> value), and hence I thought that it would work in my code as well.
 Any thoughts on why it wouldnt run here ?
 Thanks
 Anuj

Guozhang Wang

unread,
Jul 6, 2018, 1:19:50 PM7/6/18
to Confluent Platform
Hi Anuj,

I saw the difference here: in the example the `groupBy` was called on a KStream whereas in your example code it is called on a `KTable`. Note that the signature are different:

KStream:

<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector);



KTable:

<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);


The return type of the latter expects a KeyValue<KR, VR>.

Guozhang
Reply all
Reply to author
Forward
0 new messages