Unit testing KStream avro Topology

1,217 views
Skip to first unread message

Diego Irismar da Costa

unread,
Feb 27, 2017, 5:32:06 AM2/27/17
to Confluent Platform
I'm trying do unit test a very simple avro kafka stream topology using ProcessorTopologyTestDriver. 
The problem is that, even using MockSchemaRegistryClient, no schema to deserialize is found

obs: kotlin code , kafka stream version 0.10.1.1

my Serde created using MockSchemaRegistryClient 

    private fun createAvroSerde(): SpecificAvroSerde<Balance> {

        val schemaRegistry
= MockSchemaRegistryClient()
        val specificDeserializerProps
= HashMap<String, String>()

        specificDeserializerProps
.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://fake-url")
        specificDeserializerProps
.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")

        val serde
= SpecificAvroSerde<Balance>(schemaRegistry, specificDeserializerProps)
        serde
.configure(Collections.singletonMap("schema.registry.url", "http://fake-url"), false)
       
return serde
   
}

The avro Topology

    private fun avroSimpleTopology(): TopologyBuilder {

        val builder
= KStreamBuilder()
        val keySerde
= StringSerde()
        val avroSerde
= createAvroSerde()


        builder
.stream<String, Balance>(keySerde, avroSerde, "topic-01")
               
.to(keySerde, avroSerde, "topic-02")

       
return builder
   
}


testing through  ProcessorTopologyTestDriver

    @Test
    fun testSimpleAvroTopology
() {


        val props
= Properties()
        props
.put(StreamsConfig.APPLICATION_ID_CONFIG, "mocked-${UUID.randomUUID()}")
        props
.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        val driver
= ProcessorTopologyTestDriver(StreamsConfig(props), avroSimpleTopology())


        val value
= createAvroSerde().serializer().serialize("topic-01",Balance(1977L))
        val key
= StringSerializer().serialize("topic-01", "key-123")


        driver
.process("topic-01", key, value)
   
}


the stacktrace

org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=topic-01, partition=1, offset=1


Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.io.IOException: Cannot get schema from schema registry!
    at io
.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
    at io
.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
    at io
.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
    at io
.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
    at io
.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
    at avro
.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:42)
    at avro
.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:12)
    at org
.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:43)
    at org
.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:94)
    at org
.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org
.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:143)
    at org
.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:209)




Michael Noll

unread,
Feb 27, 2017, 9:00:08 AM2/27/17
to confluent...@googlegroups.com
Diego,

the error message says "Cannot get schema from schema registry".

1. Is a schema registry process running? Most probably, the answer is No.
2. Once #1 is solved, is the schema registry process accessible from your test code? Most probably, the answer is No.  For example, you configure the serde with schema.registry.url=http://fake-url, so the serde can't access the registry (unless it was up and running an actually listening at http://fake-url).
3. Once #2 is solved, can your serde properly register the Avro schema with the registry, so that it can be retrieved later during deserialization?

-Michael




--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/52b2eabe-b7f6-4a81-a008-ac00d52d8753%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Michael Noll

unread,
Feb 27, 2017, 9:09:28 AM2/27/17
to confluent...@googlegroups.com
Ah, I missed that you are using MockSchemaRegistryClient.

First guess would be:

    val driver = ProcessorTopologyTestDriver(StreamsConfig(props), avroSimpleTopology())

Here, avroSimpleTopology() internally calls createAvroSerde(), which returns an instance of the Avro serde that uses MockSchemaRegistryClient.

Later on, you call createAvroSerde() directly, which AFAICT returns a new, second instance of the Avro serde:

    val value = createAvroSerde().serializer().serialize("topic-01",Balance(1977L))

I suppose you call `createAvroSerde().serializer().serialize(...)` in order to register the schema, but you'll registering the schema to a different MockSchemaRegistryClient then the one used/returned by `avroSimpleTopology`.



To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

Diego Irismar da Costa

unread,
Feb 27, 2017, 9:23:51 AM2/27/17
to Confluent Platform
Exactly, I'm using MockSchemaRegistryClient

Your guess was right: My test was creating two schema registry instances, when I fixed that, everything worked 

tks a lot

- Diego
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages