Java stream API "reduce" throwing ClassCastException.

600 views
Skip to first unread message

Mangesh Sawant

unread,
Feb 10, 2017, 7:10:46 AM2/10/17
to Confluent Platform
Hi,

I am new to KAFKA.
Following piece of code throws exception at runtime at highlighted line.

    final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "TextLinesTopic");
    final KStream<String, String> textLines1 = textLines.selectKey((key, value) -> value.split(";")[1]);
    final KStream<String, Long> textLines2 = textLines1.mapValues(value -> Long.valueOf(value.split(";")[3]));
    textLines2.groupByKey().reduce((v1, v2) -> v1 + v2, "sum"); 

Following is the exception
        Exception in thread "StreamThread-1" java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:64)

Record in TextLinesTopic is like :
2016-08-02 10:00:08.754;23230372950;-2;205;1200;672;

Thanks,
Mangesh Sawant.

Damian Guy

unread,
Feb 10, 2017, 7:18:59 AM2/10/17
to Confluent Platform
Hi,

You need to specify the Serdes used in the groupByKey otherwise it will use the defaults. So you want to do:

textLines2.groupByKey(Serdes.String(), Serdes.Long()).reduce(...);

Thanks,
Damian



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/fd20809d-7ae7-4337-b542-a24dd1e22987%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Mangesh Sawant

unread,
Feb 13, 2017, 1:27:40 AM2/13/17
to Confluent Platform
Thanks Damian, it moved ahead but throwing a new exception on same line.
Code:
    final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "TextLinesTopic");
    final KStream<String, String> textLines1 = textLines.selectKey((key, value) -> value.split(";")[1]);
    final KStream<String, Long> textLines2 = textLines1.mapValues(value -> Long.valueOf(value.split(";")[3]));
    textLines2.groupByKey(Serdes.String(), Serdes.Long()).reduce((v1, v2) -> v1 + v2, "sum").print(); 

Following is exception now:
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000007, topic=wordcount-lambda-example-sum-repartition, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:199)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

Thanks,
Mangesh Sawant.

Mangesh Sawant

unread,
Feb 13, 2017, 7:14:35 AM2/13/17
to Confluent Platform
For time being I have declared all KStream with key and value of type string and doing casting to Long wherever required.
It worked.
Code as follows:
    final KStream<String, String> textLines1 = textLines.selectKey((key, value) -> value.split(";")[1]);
    final KStream<String, String> textLines21 = textLines1.mapValues(value -> {Long lObj = Long.parseLong(value.split(";")[4]) + Long.parseLong(value.split(";")[5]); return lObj.toString();});
    textLines21.groupByKey(stringSerde, stringSerde).reduce((v1, v2) ->  {Long lObj = Long.parseLong(v2) + Long.parseLong(v1);
                                                                                                          return lObj.toString();}, "DataPerUser").print();
 

Thanks,
Mangesh Sawant.

Damian Guy

unread,
Feb 13, 2017, 8:27:14 AM2/13/17
to Confluent Platform
Hi Mangesh,

I suspect you have some data in the topic:  wordcount-lambda-example-sum-repartition that is not of type long. That is why you are getting: "SerializationException: Size of data received by LongDeserializer is not 8"

you should be able to just do something like this:
 builder.stream(stringSerde, stringSerde, "TextLinesTopic")
        .map((key, value) -> KeyValue.pair(value.split(";")[1], Long.valueOf(value.split(";")[3]))
        .groupByKey(Serdes.String(), Serdes.Long())
        .reduce((v1, v2) -> v1 + v2, "sum")
        .print(); 

You will need to reset the topics first. You can use the reset tool: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool

Thanks,
Damian



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages