Hi,
I'm working on a demo app here:
Where I've defined key and value schemas and posted them to the schema registry.
I am able to get the schemas out of the registry:
Key:
{"subject":"clicks-avro-key","version":1,"id":21,"schema":"{\"type\":\"record\",\"name\":\"clicks\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"}],\"version\":1}"}%
Value:
{"subject":"clicks-avro-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"click\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"impression_id\",\"type\":\"string\"},{\"name\":\"creative_id\",\"type\":\"string\"},{\"name\":\"placement_id\",\"type\":\"string\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"user_agent\",\"type\":[\"string\",\"null\"]},{\"name\":\"ip\",\"type\":[\"string\",\"null\"]},{\"name\":\"referrer\",\"type\":[\"string\",\"null\"]},{\"name\":\"costs\",\"type\":\"float\"}]}"}%
In my producer I am reading in the schema to use from a .avsc file that has these contents:
Key
{
"type": "record",
"namespace": "com.example",
"name": "clicks",
"version": 1,
"fields": [
{ "name": "id", "type": "string" }
]
}Value:
{
"type": "record",
"namespace": "com.example",
"name": "clicks",
"version": 1,
"fields": [
{ "name": "id", "type": "string" },
{ "name": "impression_id", "type": "string" },
{ "name": "creative_id", "type": "string" },
{ "name": "placement_id", "type": "string" },
{ "name": "timestamp", "type":
{ "type": "long", "logicalType": "timestamp-millis" }
},
{ "name": "user_agent", "type": ["string", "null"] },
{ "name": "ip", "type": ["string", "null"] },
{ "name": "referrer", "type": ["string", "null"] },
{ "name": "cost", "type": "float" }
]
}
And I'm Attempting to produce to my kafka topic using this code:
When I run my producer it complains about the value schema, but I can't tell what is wrong:
Traceback (most recent call last):
File "kafka-demo/bin/kafka_avro_producer.py", line 92, in <module>
avro_producer.produce(topic=topic, key=click_key, value=click_value, callback=delivery_callback)
File "/usr/local/lib/python2.7/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/usr/local/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 98, in encode_record_with_schema
schema_id = self.registry_client.register(subject, schema)
File "/usr/local/lib/python2.7/site-packages/confluent_kafka/avro/cached_schema_registry_client.py", line 143, in register
raise ClientError("Incompatible Avro schema:" + str(code))
confluent_kafka.avro.error.ClientError: Incompatible Avro schema:409
Any clues as to why it thinks my schema is incompatible?
Is there a way to instead of using a file for my producer, instead using the confluent_kafka api to load a schema from the registry instead to avoid this kind of thing?
Thanks in advance,
Asher