[2018-01-05 14:15:00,272] ERROR stream-thread [StreamThread-1] Streams application error during processing: (org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.ClassCastException: [B cannot be cast to java.lang.String
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
[2018-01-05 14:15:00,332] WARN Unexpected state transition from RUNNING to NOT_RUNNING (org.apache.kafka.streams.processor.internals.StreamThread)
Exception in thread "StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.lang.String
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
[2018-01-05 14:15:00,334] WARN Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread)
in My source code :
public class KafkaEventStream
{
public static void main(final String[] args) throws Exception
{
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("streams-plaintext-input");
KTable<String, Long> wordCounts = textLines.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))).groupBy((key,
word) -> word).count("Counts");
wordCounts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}