Hi Ewen,
Thanks for reply (great that I can come into office this morning and see your suggestion).
1) Yeah! Following your suggestion, I'm able to see a formatted representation of our "BudgetEvent" SpecificRecord type message in my local kafka topic by running the kafka avro consumer script provided with confluent 1.0.1
~/installs/confluent/confluent-1.0.1
$ ./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic <our-budget-topic>
...
{"timeStamp":{"long":1445264922460},"campaignId":{"long":2},"cost":{"string":"-3"}}
...
2) So for the second part of what I was trying to do, I see that the kafka-avro-console-consumer script above seems to just call the ConsoleConsumer class, and thought I could slightly hack the example consumer project currently in
to try to mimic what is done by the script and send to the console output a representation of our "BudgetEvent", by using the AvroMessageFormatter class.
So, in io.confluent.examples.consumer.ConsumerLogic class (which is called by io.confluent.examples.consumer.ConsumerGroupExample class), the meat is in the run() method, and I did following hack shown below.
The local console output I get seems to show that everything runs until the statement
avroMessageFormatter.writeTo(record.key(), record.message(), System.out);
when everything just hangs; even the debugger frame in IntelliJ goes blank :(
Any pointers?
Of course, trying to display a representation of the SpecificRecord in our consumer code does not really do all that much for us beyond giving us some confidence that we can eventually figure out how to do end-to-end serialization/deserialization using the Schema Registry, so an even better outcome would be if you could copy/paste any snippet of code that could give us some pointer to replace what is in the run() method below to get back our "BudgetEvent" SpecificRecord POJO, but I can understand that there might be too much code that you'd need to show us at this time.... In the next release/version, would the ConsumerGroupExample class be updated to show how to produce and consume a SpecificRecord type message to/from Kafka? Thanks. Sorry about the long reply.
package io.confluent.examples.consumer;
public class ConsumerLogic implements Runnable {
...
public void run() {
// hack to try to read and display SpecificRecord
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("Thread " + threadNumber);
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> record = it.next();
String topic = record.topic();
long offset = record.offset();
System.out.println(" received: " + "Topic " + topic +
" Offset " + offset + " -- ");
AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
Properties props = new Properties();
avroMessageFormatter.init(props);
avroMessageFormatter.writeTo(record.key(), record.message(), System.out);
}
// ConsumerIterator<Object, Object> it = stream.iterator();
//
// while (it.hasNext()) {
// MessageAndMetadata<Object, Object> record = it.next();
//
// String topic = record.topic();
// int partition = record.partition();
// long offset = record.offset();
// Object key = record.key();
// GenericRecord message = (GenericRecord) record.message();
// System.out.println("Thread " + threadNumber +
// " received: " + "Topic " + topic +
// " Partition " + partition +
// " Offset " + offset +
// " Key " + key +
// " Message " + message.toString());
// }