how to use kafka-python module to decode avro messages which was produced by rest producer?

5,027 views
Skip to first unread message

Anakin Yan

unread,
Jun 29, 2015, 1:42:52 PM6/29/15
to confluent...@googlegroups.com
Hi, I have a very simple schema:

{"type":"record",
 "name":"my record",
 "fields":[{"name":"f1","type":"string"},
              {"name":"BEHAVIOR_TIME","type":"string"}
]}

when I send this later message through `kafka-avro-console-producer`, the `kafka-avro-console-consumer` and the rest consumer api both  decoded it  well.

{"f1":"v2","BEHAVIOR_TIME":"2015-05-30 00:00:27.183"}


then I tried to use the **plane** console consumer

$ bin/kafka-console-consumer --topic test --zookeeper localhost:2181 --from-beginning 
 
It returns binary code like this:

�v2.2015-05-30 00:00:27.183

Here's  my question, can I decode this message by python's kafka-python module? I tried, but failed, Here is my code:


from kafka import KafkaClient, SimpleConsumer
import avro.schema, avro.io
import cStringIO

kafka_hosts = ['localhost:9092',]
topic = 'test'

client = KafkaClient(hosts=kafka_hosts, client_id="test.avro.name")
consumer = SimpleConsumer(client, group="test.avro.name", topic=topic, )

messages = consumer.get_messages(count=1, timeout=0.5)
msg = messages[0].message.value

print msg


schema = avro.schema.parse(avsc_str)
datum_reader = avro.io.DatumReader(schema)

reader = cStringIO.StringIO(msg)
decoder = avro.io.BinaryDecoder(reader)
print datum_reader.read(decoder)

output:

 '\x00\x00\x00\x00\xa1\x04v3.2015-05-31 00:20:27.183'
{u'BEHAVIOR_TIME': u'', u'f1': u''}


the field contains no real value.


Ewen Cheslack-Postava

unread,
Jun 29, 2015, 2:07:22 PM6/29/15
to confluent...@googlegroups.com
Anakin,

The Avro support in the REST proxy integrates with the schema registry. Avro needs the schema to decode the message, but we don't want to ship the whole schema with every message, so instead the header of the message includes the ID of the schema in the registry. That should be immediately followed by the Avro data. There are some additional details around the handling of primitive types, and you can see how the Java deserializer works here:

https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L89

Ideally you would actually integrate with the schema registry when decoding the messages. If done correctly, you can get the schema used to serialize it but then project to whatever your latest schema is (assuming you maintained compatibility). However, if you're just trying to get the basic decoding working by directly including the schema in your Python code, you should be able to do that by ignoring the initial schema ID, which should be the first 4 bytes of the message.

You can also get this data back via the REST proxy by using its Avro consumer interface. In that case, all the data will be decoded for you and converted to JSON, which is very easy to work with in Python with no additional libraries.

Finally, we include a kafka-avro-console-consumer tool which can properly decode those messages rather than writing the raw bytes like kafka-console-consumer does.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/1376cf9c-790a-4ecc-b7f8-bb2b5a3dea58%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Anakin Yan

unread,
Jun 29, 2015, 10:06:05 PM6/29/15
to confluent...@googlegroups.com
Ewen, 

Thank you very much for the reply. I omitted the first 5 bytes, and tried again, now it can be decoded properly.

I know there are REST proxy and kafka-avro-console-consumer can do the job, but in some situation I just don't want consume messages via REST proxy.

Thanks again, it helps a lot.

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen

luis arturo garcia

unread,
Jul 18, 2016, 2:00:58 PM7/18/16
to Confluent Platform
How do you remove the first 5 bytes?
How do i decode the schema-id?

Regards,
LAGG

Ryan Anguiano

unread,
Aug 5, 2016, 4:57:08 PM8/5/16
to Confluent Platform
I use this package by verisign: https://github.com/verisign/python-confluent-schemaregistry

Doesn't seem to be on PyPI so i uploaded it myself here: https://pypi.python.org/pypi/rpm-confluent-schemaregistry/0.1.1

You can see an example of using the MessageSerializer here: https://github.com/revpoint/jangl-utils/blob/master/jangl_utils/kafka/consumers.py#L92
Reply all
Reply to author
Forward
0 new messages