private fun createAvroSerde(): SpecificAvroSerde<Balance> {
val schemaRegistry = MockSchemaRegistryClient()
val specificDeserializerProps = HashMap<String, String>()
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://fake-url")
specificDeserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
val serde = SpecificAvroSerde<Balance>(schemaRegistry, specificDeserializerProps)
serde.configure(Collections.singletonMap("schema.registry.url", "http://fake-url"), false)
return serde
}
private fun avroSimpleTopology(): TopologyBuilder {
val builder = KStreamBuilder()
val keySerde = StringSerde()
val avroSerde = createAvroSerde()
builder.stream<String, Balance>(keySerde, avroSerde, "topic-01")
.to(keySerde, avroSerde, "topic-02")
return builder
}
@Test
fun testSimpleAvroTopology() {
val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mocked-${UUID.randomUUID()}")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
val driver = ProcessorTopologyTestDriver(StreamsConfig(props), avroSimpleTopology())
val value = createAvroSerde().serializer().serialize("topic-01",Balance(1977L))
val key = StringSerializer().serialize("topic-01", "key-123")
driver.process("topic-01", key, value)
}org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=topic-01, partition=1, offset=1
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.io.IOException: Cannot get schema from schema registry!
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:106)
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getBySubjectAndID(MockSchemaRegistryClient.java:149)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
at avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:42)
at avro.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:12)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:43)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:94)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:143)
at org.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:209)--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/52b2eabe-b7f6-4a81-a008-ac00d52d8753%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.