Kafka proxy is not registering schemas

2,275 views
Skip to first unread message

Ignacio Alvarez

unread,
Jul 11, 2015, 9:14:52 AM7/11/15
to confluent...@googlegroups.com
Hello supporters,

I am having issues with the proxy not recognizing the schemas. This is the kind of message I would like to send:

{
"records":[
    {"ID":"YW1hbGdhbXdheS5yZWFsc2Vuc2UuRjIwMC5LYWZrYTE0MzYzOTcwMTc1OTQuQ2FwdHVyZWZyYW1lLjI0"
    },
    {"timestamp":1436397017594
    },
    {"data":"/9j/4AAQSkZJRgABAgAAAQABAADXtsNO5/"}
],
"value_schema":"{\"type\": \"record\", \"name\": \"capture_frame\", \"fields\": [{\"name\": \"ID\", \"type\": \"string\"},{\"name\": \"timestamp\", \"type\": \"long\"},{\"name\": \"data\", \"type\": \"string\"}]}"
}

ID and data are Base64 encoded strings.
If I try so send this data, I get 500 errors.

Eventually I got the system to accept data sending the following message: 

{
"records":[
    {"value":"YW1hbGdhbXdheS5yZWFsc2Vuc2UuRjIwMC5LYWZrYTE0MzYzOTcwMTc1OTQuQ2FwdHVyZWZyYW1lLjI0"
    },
    {"value":1436397017594
    },
    {"value":"/9j/4AAQSkZJRgABAgAAAQABAADXtsNO5/"}
],
"value_schema":"{\"type\": \"record\", \"name\": \"capture_frame\", \"fields\": [{\"name\": \"ID\", \"type\": \"string\"},{\"name\": \"timestamp\", \"type\": \"long\"},{\"name\": \"data\", \"type\": \"string\"}]}"
}

But I had to use "application/json" for content type instead of "application/vnd.kafka.avro.v1+json"  or the binary header version when sending binary messages. This gets accepted by the API with this message:

{"offsets":[{"partition":0,"offset":50,"error_code":null,"error":null},{"partition":0,"offset":51,"error_code":null,"error":null},{"partition":0,"offset":52,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

Notice that both key_schema_id and value_schema_id are NULL.
If I try specifying the key_schema_id registered for the topic but I get missing value_schema_id responses and when only specifying the key_schema_id (without value) I get a 422 "" error back. 

Anyway since I was getting data in I tried to keep working on my usecase. I am using Spark streaming to consume the messages and process the data. 
Now the behavior on my Spark program. has thrown me off. When I am getting the messages into RDDs, the Base64 data is automatically decoded (no idea where this happens, i was not expecting it) and the long value of the timestamp is base64 encoded!
And since all the fields are "value" and there is no schema associated with it I get malformed jsons when receiving the data and I cannot map it properly inside the streaming minibatches.

Can anyone help me out figure out why the schema registration is not working and how to fix it or how to do it correctly?

Thanks,

Ignacio

 

Ewen Cheslack-Postava

unread,
Jul 11, 2015, 6:25:15 PM7/11/15
to confluent...@googlegroups.com
Hi Ignacio,

Responses inline.

On Sat, Jul 11, 2015 at 6:14 AM, Ignacio Alvarez <ignaci...@gmail.com> wrote:
Hello supporters,

I am having issues with the proxy not recognizing the schemas. This is the kind of message I would like to send:

{
"records":[
    {"ID":"YW1hbGdhbXdheS5yZWFsc2Vuc2UuRjIwMC5LYWZrYTE0MzYzOTcwMTc1OTQuQ2FwdHVyZWZyYW1lLjI0"
    },
    {"timestamp":1436397017594
    },
    {"data":"/9j/4AAQSkZJRgABAgAAAQABAADXtsNO5/"}
],
"value_schema":"{\"type\": \"record\", \"name\": \"capture_frame\", \"fields\": [{\"name\": \"ID\", \"type\": \"string\"},{\"name\": \"timestamp\", \"type\": \"long\"},{\"name\": \"data\", \"type\": \"string\"}]}"
}

ID and data are Base64 encoded strings.
If I try so send this data, I get 500 errors.

I actually get a 422 with this payload, which indicates the format isn't valid. I see two issues:
1. Each record should be wrapped in an object that contains "key", "value", and "partition" fields (where key and partition are optional and may be omitted). It would be nice to be able to just specify value-only records directly, but because of the JSON encoding of Avro that we embed, allowing this shorthand would lead to ambiguities.
2. The format for your Avro record isn't correct. Since you just have a record with a couple of primitive type fields, the record itself should look like this:
    {"ID":"YW1hbGdhbXdheS5yZWFsc2Vuc2UuRjIwMC5LYWZrYTE0MzYzOTcwMTc1OTQuQ2FwdHVyZWZyYW1lLjI0","timestamp":1436397017594,"data":"/9j/4AAQSkZJRgABAgAAAQABAADXtsNO5/"}

Combining those two, the following curl command works:

curl -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" --data '{"records":[{"value":{"ID":"YW1hbGdhbXdheS5yZWFsc2Vuc2UuRjIwMC5LYWZrYTE0MzYzOTcwMTc1OTQuQ2FwdHVyZWZyYW1lLjI0","timestamp":1436397017594,"data":"/9j/4AAQSkZJRgABAgAAAQABAADXtsNO5/"}}],"value_schema":"{\"type\": \"record\", \"name\": \"capture_frame\", \"fields\": [{\"name\": \"ID\", \"type\": \"string\"},{\"name\": \"timestamp\", \"type\": \"long\"},{\"name\": \"data\", \"type\": \"string\"}]}"}' "http://localhost:8082/topics/avrotest"

Since that's a bit hard to parse in that format, here's the payload pretty-printed:

{
   "records":[
      {
         "value":{
            "ID":"YW1hbGdhbXdheS5yZWFsc2Vuc2UuRjIwMC5LYWZrYTE0MzYzOTcwMTc1OTQuQ2FwdHVyZWZyYW1lLjI0",
            "timestamp":1436397017594,

            "data":"/9j/4AAQSkZJRgABAgAAAQABAADXtsNO5/"
         }
      }
   ],
   "value_schema":"{\"type\": \"record\", \"name\": \"capture_frame\", \"fields\": [{\"name\": \"ID\", \"type\": \"string\"},{\"name\": \"timestamp\", \"type\": \"long\"},{\"name\": \"data\", \"type\": \"string\"}]}"
}

I'm not certain why you're getting 500 errors, but I suspect it's probably due to invalid JSON (perhaps there was some variant close to the above JSON but which had a small JSON error?) -- we have an issue filed to address this: https://github.com/confluentinc/rest-utils/issues/9
 

Eventually I got the system to accept data sending the following message: 

{
"records":[
    {"value":"YW1hbGdhbXdheS5yZWFsc2Vuc2UuRjIwMC5LYWZrYTE0MzYzOTcwMTc1OTQuQ2FwdHVyZWZyYW1lLjI0"
    },
    {"value":1436397017594
    },
    {"value":"/9j/4AAQSkZJRgABAgAAAQABAADXtsNO5/"}
],
"value_schema":"{\"type\": \"record\", \"name\": \"capture_frame\", \"fields\": [{\"name\": \"ID\", \"type\": \"string\"},{\"name\": \"timestamp\", \"type\": \"long\"},{\"name\": \"data\", \"type\": \"string\"}]}"
}

But I had to use "application/json" for content type instead of "application/vnd.kafka.avro.v1+json"  or the binary header version when sending binary messages. This gets accepted by the API with this message:

{"offsets":[{"partition":0,"offset":50,"error_code":null,"error":null},{"partition":0,"offset":51,"error_code":null,"error":null},{"partition":0,"offset":52,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

Notice that both key_schema_id and value_schema_id are NULL.
If I try specifying the key_schema_id registered for the topic but I get missing value_schema_id responses and when only specifying the key_schema_id (without value) I get a 422 "" error back. 

application/json uses the binary mode, which doesn't handle schemas at all. So this is only working because it's skipping any schema registration.
 

Anyway since I was getting data in I tried to keep working on my usecase. I am using Spark streaming to consume the messages and process the data. 
Now the behavior on my Spark program. has thrown me off. When I am getting the messages into RDDs, the Base64 data is automatically decoded (no idea where this happens, i was not expecting it) and the long value of the timestamp is base64 encoded!

This is happening because you're producing with binary mode. The base64 decoding is being performed by the REST proxy because that's how it's binary mode works -- it accepts base64-encoded data, decodes it, and writes the raw binary data to Kafka.

You're probably also seeing that each *field* shows up as its own message. This is because you have split a single record (the combination of ID, timestamp, and data) across multiple records (each object in the list of records).

The timestamp is an interesting find -- that shouldn't even be accepted by the REST proxy since it isn't a string. I'll have to file a bug for that and track down why that's making it through validation.
 
And since all the fields are "value" and there is no schema associated with it I get malformed jsons when receiving the data and I cannot map it properly inside the streaming minibatches.

Can anyone help me out figure out why the schema registration is not working and how to fix it or how to do it correctly?

Hopefully the explanation of the issues with the format of the JSON in your produce payload should get you started. Let us know if you're still running into issues after making those changes.

-Ewen
 

Thanks,

Ignacio

 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/b5814765-f363-4fcf-8252-d682b3e5086c%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Ignacio Alvarez

unread,
Jul 13, 2015, 12:35:22 AM7/13/15
to confluent...@googlegroups.com
Thanks a lot Ewen,

that made things so much clear!

I am trying your curl suggestion and I getting back 
{
  "error_code": 40801,
  "message": "Schema registration or lookup failed"
}

The schema is registered to that topic  with id= 81 in my case, so it must be the lookup failing.
I also tried sending the message with the value_schema_id. Same response.
It looks like this is a newly added rest exception, I only found references in the repository, not in the docs. Could this be a service in the schema registry that is failing or some kind of configuration?

Regards,

Ignacio  

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/c-PJ3YQU7Kk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Ignacio Alvarez, PhD

Research Scientist, Intel Corporation, Hillsboro, OR

Ewen Cheslack-Postava

unread,
Jul 13, 2015, 1:28:52 PM7/13/15
to confluent...@googlegroups.com
Ignacio,

It looks like we just missed this error code in our docs, and I've filed https://github.com/confluentinc/kafka-rest/issues/100 for that. All this indicates is that the schema registry generated an error. We should probably be including more information to indicate the nature of the error. If you're running the schema registry locally, you can just take a look at its log to see what's going on -- it should at least be logging the HTTP status code, but is probably also logging the original exception.

-Ewen


For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Ignacio Alvarez

unread,
Jul 13, 2015, 3:45:23 PM7/13/15
to confluent...@googlegroups.com
Hi Ewen,

I finally figured out what the issue was. When I installed kafka-rest on my server, the default ports were conflicting with another application. So I moved the default port 8081 to 8082 and the schema registration to 8083. I was able to access externally the schema registry api but I missed changing the internal default config. So the proxy was returning 408 because the the internal lookups to schema where aiming at the wrong port.

I hope my mistake helps others troubleshoot!

Regards,

Ignacio


For more options, visit https://groups.google.com/d/optout.

Ignacio Alvarez

unread,
Jul 16, 2015, 9:07:15 PM7/16/15
to confluent...@googlegroups.com
Hello all,

before closing this topic. Could you tell me a bit more about how the binary mode of the Kafka proxy works?
I am interested in what Base64 decoder it uses and what character encoding it uses. I would expect it to use UTF-8.

Thanks,

Ignacio

Ewen Cheslack-Postava

unread,
Jul 16, 2015, 9:49:16 PM7/16/15
to confluent...@googlegroups.com
Ignacio,

It uses the base64 encoder/decoder in javax.xml.bind.DatatypeConverter. You can see the code here: https://github.com/confluentinc/kafka-rest/blob/master/src/main/java/io/confluent/kafkarest/entities/EntityUtils.java

For encoding, using UTF-8 should work, as will ASCII since the characters used are a very limited set.

-Ewen


For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages