Kafka Stream output is showing in binary in console consumer

2,167 views
Skip to first unread message

sohi mankotia

unread,
Apr 27, 2017, 7:29:13 AM4/27/17
to Confluent Platform
Hi, 

I am running kafka stream word count example . My output topic data is showing in binary in console consumer :

1. Maven details :

         <dependency>
           
<groupId>org.apache.kafka</groupId>
           
<artifactId>kafka-clients</artifactId>
           
<version>0.10.2.0</version>
       
</dependency>
       
<dependency>


           
<groupId>org.apache.kafka</groupId>
           
<artifactId>kafka-streams</artifactId>
           
<version>0.10.2.0</version>
       
</dependency>


2. Code 

public class Main {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount2");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());//NOSONAR
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());//NOSONAR

KStreamBuilder builder = new KStreamBuilder();

KStream<String, String> source = builder.stream("connect-test-2");

KTable<String, Long> counts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.map((key, value) ->{
System.out.println(value);
return new KeyValue<>(value, value);
})
.groupByKey()
.count("Counts");

// need to override value serde to Long type
counts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

//streams.close();
}


}



When I viewing data on console consumer for WordsWithCountsTopic it is showing data in binary .

Michael Noll

unread,
Apr 27, 2017, 10:52:08 AM4/27/17
to confluent...@googlegroups.com
Hard to tell with the information you provided, but I suppose you forgot got tell the console consumer to use a Long deserializer for the record values in the output topic "WordsWithCountsTopic".

$ kafka-console-consumer --topic WordsWithCountsTopic --from-beginning \
                            --new-consumer --bootstrap-server localhost:9092 \
                            --property print.key=true \
                            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Hope this helps,
Michael

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/156f42f1-0ff6-42d5-9cd4-4b7617c215c3%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Michael G. Noll
Product Manager | Confluent
Follow us: Twitter | Blog

Shannon Ma

unread,
Apr 27, 2017, 11:48:35 AM4/27/17
to Confluent Platform
I have a similar question, does the console work with other deserializer? i am trying with 


 ./bin/kafka-console-consumer --zookeeper localhost:2181 --topic sanitation  --from-beginning --property value.deserializer=io.confluent.examples.streams.utils.SpecificAvroDeserializer --property schema.registry.url=http://localhost:8081


org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: java.lang.NullPointerException
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)


seems it does not pick up schema registry url

Thanks
Shannon

Matthias J. Sax

unread,
May 2, 2017, 2:16:00 PM5/2/17
to confluent...@googlegroups.com
For Avro, you should use kafka-avro-console-producer/consumer

For example, see the CP quickstart
http://docs.confluent.io/current/quickstart.html

Also some more info here
http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/79e3ecf5-1503-4065-af45-8f5656b3fea2%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/79e3ecf5-1503-4065-af45-8f5656b3fea2%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

sohi mankotia

unread,
May 4, 2017, 2:38:55 AM5/4/17
to Confluent Platform
Thanks Michael . That worked . But I observed some latency for getting results on console consumer .
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages