I am new to ksql and having trouble getting the Avro integration to work. We have an existing Avro schema I am trying to use with ksql using the VALUE_AVRO_SCHEMA_FULL_NAME property in the WITH clause. The schema has several substructures one of which has a property of type map. The same schema is used for several different kafka topics.
Something is not working as I expected. I created a stream from a topic and the map has the following schema when I retrieve if from the schema registry:
{"name":"entityMap","type":{"type":"map","values":"string"}
Which is what I expected. When I print the output from that steam I also get what I expect:
"entityMap": {"CoreID": "4"},
I then created another stream from that stream on a different topic but using the same VALUE_AVRO_SCHEMA_FULL_NAME property value in the WITH clause. This time the schema is different:
{"name":"ENTITYMAP","type":["null",{"type":"array","items":{"type":"record","name":"Metric_METRICSOURCE_ENTITYMAP","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","string"],"default":null}],"connect.internal.type":"MapEntry"}}],"default":null},
When I print the output from the new stream it is not what I expected:
"ENTITYMAP": [{"key": "CoreID", "value": "4"}],
The consumer of this topic throws the following exception (a different existing service):
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition OI-EVENTS.0-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 6
Caused by: org.apache.avro.AvroTypeException: Found array, expecting map
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
I have been able to take care of the casing issues and other naming problems using the aliases property in the avro schema file. The schema snippet is:
{
"name": "entityMap",
"aliases": ["ENTITYMAP", "Metric_METRICSOURCE_ENTITYMAP"],
"order": "ignore",
"type": {"type": "map", "values": "string"}
},
I am not sure what I am doing wrong but its been a tough slog getting the naming to line up and this is the last issue I am having.