NPE using GenericAvroSerde as input for KTable#.to value Serde to serialize

189 views
Skip to first unread message

Philippe Derome

unread,
Jul 6, 2016, 8:38:22 AM7/6/16
to confluent...@googlegroups.com
The item thread had started on Kafka mailing list but it's more appropriate here I think.

comment to Michael Noll on earlier thread: I meant serialization yes, but I use a Serde object to accomplish it as second parameter to KTable#to method. 

My console output with stack trace at end of email. Similar code not shown here which replaces GenericRecord with JsonNode and the GenericAvroSerde with a Serde<JsonNode> works beautifully as expected. The question is whether I am doing something wrong in usage of GenericRecord, KTable, GenericAvroSerde and KTable#to below or whether there is something that needs to change in some of these classes.
// definition of KTables for AAs and BBs skipped as well as definition of outputSchema, referenced below.
final KTable<String, GenericRecord> joinedTable = AAs
.join(BBs, (a, b) -> {
final GenericRecordBuilder joined = new GenericRecordBuilder(outputSchema);

// some statements building fields on builder joined skipped here.
return joined.build();
});

final Serde<GenericRecord> genericRecordSerde = new GenericAvroSerde();
final Serde<String> stringSerde = Serdes.String();

joinedTable.to(stringSerde, genericRecordSerde, "output-topic");

final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();


[2016-07-06 08:24:39,355] WARN [main] The configuration __stream.thread.instance__ = Thread[StreamThread-1,5,main] was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
[2016-07-06 08:24:39,367] WARN [main] The configuration schema.registry.url = http://localhost:8081 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
at io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:47)
at io.confluent.examples.streams.utils.GenericAvroSerializer.serialize(GenericAvroSerializer.java:25)
at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:77)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:85)
at org.apache.kafka.streams.kstream.internals.KTableKTableJoin$KTableKTableJoinProcessor.process(KTableKTableJoin.java:48)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:78)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:80)
at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:73)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.kstream.internals.KTableSource$MaterializedKTableSourceProcessor.process(KTableSource.java:77)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Disconnected from the target VM, address: '127.0.0.1:52552', transport: 'socket'

Damian Guy

unread,
Jul 6, 2016, 9:27:26 AM7/6/16
to confluent...@googlegroups.com
Hi Philippe,

It is because the SchemaRegistry is null. When constructing a GenericAvroSerde you need to do something like:

final GenericAvroSerde serde = new GenericAvroSerde();
serde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig
.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081"),false);
Thanks,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/CAPbAcg6R2bx0S9kdaPTavoth6tOTc3OMSaXGv-VwHWpo11Korw%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Philippe Derome

unread,
Jul 6, 2016, 1:57:21 PM7/6/16
to Confluent Platform
Excellent, that solved my issue! Much thanks. What follows is to ensure I understand well if you don't mind.

In my stream processor main method, I also had this line prior to your suggested fix, which apparently did not help:
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

The line above was taken from other examples either from Kafka or Confluent or someone connected to community, so I assume that the config would affect the KafkaStreams object meaningfully via new KafkaStreams(builder, streamsConfiguration) Yet I see no effect.

In a few examples, I see stated that if we need a different serializer or deserializer for key or value we need to indicate it in KTable#to or KStream#to in addition to the topic parameter because the default for the topology could be wrong when publishing (say we publish a Long value for a count instead of a String, so we specify a longSerde). This would imply that the KafkaStreams configuration is somehow pushed down properly when making the call to KTable#to without serde parameters. But that would be provided the call to KTable#to has no Serde parameter specified, right?

In other words, my error was that my config got overridden by specifying an explicit Serde as parameter to KTable#to and that parameter needs context about the registry since there's a reference to the schema on the data to be published that must be resolved with the registry process in order to use correct encoding to the Kafka cluster. 

Do I understand correctly? 
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Damian Guy

unread,
Jul 6, 2016, 3:28:17 PM7/6/16
to Confluent Platform
That is correct.

Thanks

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Michael Noll

unread,
Jul 7, 2016, 9:46:35 AM7/7/16
to confluent...@googlegroups.com
Phil,

FYI: I just created a new PR for confluentinc/examples that updates SpecificAvroIntegrationTest and GenericAvroIntegrationTest to demonstrate how you can create/use/configure explicit Avro serdes.


This should help other users who might run into the same question.

Best,
Michael





For more options, visit https://groups.google.com/d/optout.



--
Best regards,
Michael Noll


Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
Download Apache Kafka and Confluent Platform: www.confluent.io/download

Philippe Derome

unread,
Jul 7, 2016, 10:11:03 AM7/7/16
to confluent...@googlegroups.com
ok, thanks.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/xjMIDDEulsI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

Philippe Derome

unread,
Jul 7, 2016, 11:50:23 AM7/7/16
to confluent...@googlegroups.com
Are we implying we should use SpecificAvroSerde in a Streams processor sitting between a producer and a consumer for the Streams processor to send data as value on a topic that is a specific Avro record? I tried a JSONPojoSerde at first (following the Kafka Streams example) with no luck and a few other things also with no luck.

What I am really trying to do is having a Streams processor use the generated Java POJO class API to set the data and then call producer.send (as advertised in Kafka The Definitive Guide). That superficially works since I see the data on the topic with a console consumer. But I don't truly understand the format of the data on the wire even though I see it. This leads to cast exception on the consumer because my consumer does not want to consume as GenericRecord (if it does, it gets it).

To troubleshoot, I use a modification of Damian's driver split into a consumer and producer and I try to get the producer to send type-safe Avro data and for the consumer to consume it directly (meaning I want my apps to manipulate generated Java classes from avsc and not use the GenericRecord interface at all). At run-time, the consumer upon looping through the records following poll is not getting the expected specific Avro Java POJO but instead it gets GenericData$Record. That's the key point, I don't know why I am getting that and what to do with it. I was expecting a POJO. With type erasure and Java the cast to the POJO is hidden at compile time so the compiler does not help much, but sure enough gives me a class cast exception at run-time.

BTW: Kafka Definitive Guide producer example for this in Chapter 4 is incomplete and has errors (specifies for instance a serializer instead of a deserializer) so I cannot base it on that. I tried the KafkaAvroDeserializer in my consumer but not luck.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); 1
props.put("schema.registry.url", schemaUrl); 2
String topic = "customerContacts"

Philippe Derome

unread,
Jul 7, 2016, 12:02:04 PM7/7/16
to confluent...@googlegroups.com
Looks like the SpecificAvroSerde is exactly the tool I was looking for here. The whole casting issue disappears. It seems like this particular Serde could get a little more visibility in examples and documentation? ;-) I didn't see it Kafka The Definitive Guide...

Phil

Reply all
Reply to author
Forward
0 new messages