My console output with stack trace at end of email. Similar code not shown here which replaces GenericRecord with JsonNode and the GenericAvroSerde with a Serde<JsonNode> works beautifully as expected. The question is whether I am doing something wrong in usage of GenericRecord, KTable, GenericAvroSerde and KTable#to below or whether there is something that needs to change in some of these classes.
[2016-07-06 08:24:39,355] WARN [main] The configuration __stream.thread.instance__ = Thread[StreamThread-1,5,main] was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2016-07-06 08:24:39,367] WARN [main] The configuration schema.registry.url =
http://localhost:8081 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
at io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:47)
at io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25)
at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:77)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:85)
at org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:48)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:78)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:80)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableSource$MaterializedKTableSourceProcessor.process(KTableSource.java:77)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Disconnected from the target VM, address: '
127.0.0.1:52552', transport: 'socket'