Kafka Streams - not able to see the text in console consumer

277 views
Skip to first unread message

karan alang

unread,
Sep 17, 2017, 11:06:07 PM9/17/17
to Confluent Platform
Hello All 
- i've a basic word count Kafka streams code, which reads data from the input topic,
splits the data (per the separator) and outputs the data into the output topic.

I've a console producer, which is putting data into the input topic (kstreams3),
and a console consumer which is reading the (split) data from output topic(kstreams4)

The code seems to be working fine, 
but on the console consumer, i'm Not able to see the text.
It seems to be printing blanks, instead of the actual text ..
(though, console consumer seems to be consuming the correct number of lines)

what seems to be the issue here ?

Code -> 

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamswordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("kstreams3"); 

KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word)-> word)
.count("Counts");

System.out.println(" Kstreams - wordCount " + wordCounts);

wordCounts.to(Serdes.String(),Serdes.Long(),"kstreams4");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

Matthias J. Sax

unread,
Sep 18, 2017, 1:31:39 PM9/18/17
to confluent...@googlegroups.com
Cross posting an answer from Kafka user list:

> Hi, Karan,
>
> It looks like you need to add a property 'value.deserializer' to
> kafka-console-consumer.sh.
>
> For example:
> $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic
> kstreams4 --from-beginning \
> --property print.key=true \
> --property print.value=true \
> --property
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
> --property
> value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
>
>
> Vito
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/92abad51-7a95-42f8-bac8-397f1d246e5e%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/92abad51-7a95-42f8-bac8-397f1d246e5e%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc
Reply all
Reply to author
Forward
0 new messages