Issues with Kakfa-Rest

136 views
Skip to first unread message

John Omernik

unread,
May 27, 2015, 8:52:01 AM5/27/15
to confluent...@googlegroups.com
This is a continuation from the Kafka-Rest Examples Thread. 

Basically at this point, I am having trouble figuring out why a request I am sending is not going through to Kafka-Rest.  Included below are both the raw requests and responses, the schema I am getting from Schema registry, and the "prettified" data from the raw request for easy human parsing. 

Here are the steps that happen in my script:

1. I contact the schema registry and since I know the "name" of the schema I want to use, I ask for it.  I get the Schema ID. 
2. I parse the log files, getting them formatted per the schema and what the API expects (I think...)
3. I send the request. I will be batching, but right now I am just trying to send one record. You can see I am updating the headers per the docs. 

Things I have done to troubleshoot

1. Turned on debug=true in the kafka-rest config. This did not provide any more data
2. I've compiled the dev version of kafka rest and am running it hoping that by using a version with https://github.com/confluentinc/kafka-rest/issues/81 resolved, I could get more robust error messages instead of just 422.  This did not help, the 2.0-SNAPSHOT provided me the same blank error message. 
3. I've taken the schema, as returned by schema registry and the data as sent by my script (I took everything in the "records" array) and put them into files. I was able to successfully use the avrotools 1.7.7 to both create a avro file with these json sources, and then from that file extract both the schema and the data exactly as expected from the avro file. There were no errors or warning in using the avro tool. I believe the files seem correct.   

The only other step that has been suggested was to methodically take my schema and data and slowly cut off fields to see if there is a specific field giving me problems.  While I agree that it may work, I really want to avoid this.  Basically, I am trying to determine how something like this could work in production, and that process should be an emergency last step, not something as part of the standard development cycle.  I am using this opportunity to learn more about debugging the messages and provide feedback to confluent to make this process easier for production deployments. 


I guess my question at this point is this:

What are my next steps for troubleshooting. How can I turn on deeper logs, and/or get more data either here, to confluent, etc to help me troubleshoot this and also provide these rich troubleshoot steps back to the community. 

Thanks!

John




Raw Request (as sent over the wire and sniffed by tcpdump)


POST /topics/brocon HTTP/1.1
Host: kafka-rest.marathon.mesos:8192
Content-Length: 633
Accept-Encoding: gzip, deflate
Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
User-Agent: python-requests/2.7.0 CPython/2.7.3 Linux/3.13.0-30-generic
Connection: keep-alive
Content-Type: application/vnd.kafka.avro.v1+json

{"value_schema_id": 121, "key_schema_id": null, "records": [{"ts": {"string": "1431637257.883239"}, "uid": {"string": "CJEy5h20lBY6V5QAI3"}, "id_orig_h": {"string": "192.168.225.103"}, "id_orig_p": {"int": 2148}, "id_resp_h": {"string": "85.25.153.26"}, "id_resp_p": {"int": 20050}, "proto": {"string": "tcp"}, "service": null, "duration": null, "orig_bytes": null, "resp_bytes": null, "conn_state": {"string": "S0"}, "local_orig": null, "missed_bytes": {"long": 0}, "history": {"string": "S"}, "orig_pkts": {"long": 1}, "orig_ip_bytes": {"long": 48}, "resp_pkts": {"long": 0}, "resp_ip_bytes": {"long": 0}, "tunnel_parents": null}]}


Raw Response from Kafka Rest API Server:



HTTP/1.1 422

Content-Length: 31
Content-Type: application/json
Server: Jetty(8.1.16.v20140903)

{"error_code":422,"message":""}



Schema As returned from Schema Registry (this isn't what I provided, instead, I am getting the Schema by name prior to to sending the data, and it gets me Schema ID 121 as seen in the request)

{



   
"fields": [

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for ts",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "ts"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for uid",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "uid"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for id.orig_h",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "id_orig_h"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for id.orig_p",

           
"type": [

               
"null",

               
"int"

           
],

           
"name": "id_orig_p"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for id.resp_h",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "id_resp_h"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for id.resp_p",

           
"type": [

               
"null",

               
"int"

           
],

           
"name": "id_resp_p"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for proto",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "proto"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for service",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "service"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for duration",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "duration"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for orig_bytes",

           
"type": [

               
"null",

               
"long"

           
],

           
"name": "orig_bytes"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for resp_bytes",

           
"type": [

               
"null",

               
"long"

           
],

           
"name": "resp_bytes"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for conn_state",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "conn_state"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for local_orig",

           
"type": [

               
"null",

               
"boolean"

           
],

           
"name": "local_orig"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for missed_bytes",

           
"type": [

               
"null",

               
"long"

           
],

           
"name": "missed_bytes"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for history",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "history"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for orig_pkts",

           
"type": [

               
"null",

               
"long"

           
],

           
"name": "orig_pkts"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for orig_ip_bytes",

           
"type": [

               
"null",

               
"long"

           
],

           
"name": "orig_ip_bytes"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for resp_pkts",

           
"type": [

               
"null",

               
"long"

           
],

           
"name": "resp_pkts"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for resp_ip_bytes",

           
"type": [

               
"null",

               
"long"

           
],

           
"name": "resp_ip_bytes"

       
},

       
{

           
"default": null,

           
"doc": "Default Bro Schema parse for tunnel_parents",

           
"type": [

               
"null",

               
"string"

           
],

           
"name": "tunnel_parents"

       
}

   
],

   
"type": "record",

   
"name": "brocon1"

 

}


This is the prettified raw request (as seen above from TCPDump for easy reading):

{



   
"value_schema_id": 121,

   
"key_schema_id": null,

   
"records": [

       
{

           
"ts": {

               
"string": "1431637257.883239"

           
},

           
"uid": {

               
"string": "CJEy5h20lBY6V5QAI3"

           
},

           
"id_orig_h": {

               
"string": "192.168.225.103"

           
},

           
"id_orig_p": {

               
"int": 2148

           
},

           
"id_resp_h": {

               
"string": "85.25.153.26"

           
},

           
"id_resp_p": {

               
"int": 20050

           
},

           
"proto": {

               
"string": "tcp"

           
},

           
"service": null,

           
"duration": null,

           
"orig_bytes": null,

           
"resp_bytes": null,

           
"conn_state": {

               
"string": "S0"

           
},

           
"local_orig": null,

           
"missed_bytes": {

               
"long": 0

           
},

           
"history": {

               
"string": "S"

           
},

           
"orig_pkts": {

               
"long": 1

           
},

           
"orig_ip_bytes": {

               
"long": 48

           
},

           
"resp_pkts": {

               
"long": 0

           
},

           
"resp_ip_bytes": {

               
"long": 0

           
},

           
"tunnel_parents": null

       
}

   
]

 

}










Ewen Cheslack-Postava

unread,
May 28, 2015, 1:56:21 AM5/28/15
to confluent...@googlegroups.com
John,

Your requests are missing the envelope for each message. In the produce request, the "records" field is a list of objects, each of which is an envelope for a message which may contain the key, value, and partition (when producing to a topic).

Even if your messages only contain values, you need to have this wrapper. So instead of

{"value_schema_id": 121, "key_schema_id": null, "records": [{"ts": ...

you'll need

{"value_schema_id": 121, "key_schema_id": null, "records": [{"value": {"ts": ...

You need to have this envelope because the JSON-encoded Avro for a standalone value and the envelope that is required if you want to include a key or partition can be ambiguous (e.g., think of the very confusing case where your Avro record contains fields called "key", "value", and "partition").

For reference, this (very long and messy) cURL request worked for me:

curl -v -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json"       --data '{"value_schema": "{ \"fields\": [ { \"default\": null, \"doc\": \"Default Bro Schema parse for ts\", \"type\": [ \"null\", \"string\" ], \"name\": \"ts\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for uid\", \"type\": [ \"null\", \"string\" ], \"name\": \"uid\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for id.orig_h\", \"type\": [ \"null\", \"string\" ], \"name\": \"id_orig_h\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for id.orig_p\", \"type\": [ \"null\", \"int\" ], \"name\": \"id_orig_p\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for id.resp_h\", \"type\": [ \"null\", \"string\" ], \"name\": \"id_resp_h\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for id.resp_p\", \"type\": [ \"null\", \"int\" ], \"name\": \"id_resp_p\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for proto\", \"type\": [ \"null\", \"string\" ], \"name\": \"proto\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for service\", \"type\": [ \"null\", \"string\" ], \"name\": \"service\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for duration\", \"type\": [ \"null\", \"string\" ], \"name\": \"duration\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for orig_bytes\", \"type\": [ \"null\", \"long\" ], \"name\": \"orig_bytes\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for resp_bytes\", \"type\": [ \"null\", \"long\" ], \"name\": \"resp_bytes\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for conn_state\", \"type\": [ \"null\", \"string\" ], \"name\": \"conn_state\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for local_orig\", \"type\": [ \"null\", \"boolean\" ], \"name\": \"local_orig\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for missed_bytes\", \"type\": [ \"null\", \"long\" ], \"name\": \"missed_bytes\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for history\", \"type\": [ \"null\", \"string\" ], \"name\": \"history\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for orig_pkts\", \"type\": [ \"null\", \"long\" ], \"name\": \"orig_pkts\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for orig_ip_bytes\", \"type\": [ \"null\", \"long\" ], \"name\": \"orig_ip_bytes\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for resp_pkts\", \"type\": [ \"null\", \"long\" ], \"name\": \"resp_pkts\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for resp_ip_bytes\", \"type\": [ \"null\", \"long\" ], \"name\": \"resp_ip_bytes\" }, { \"default\": null, \"doc\": \"Default Bro Schema parse for tunnel_parents\", \"type\": [ \"null\", \"string\" ], \"name\": \"tunnel_parents\" } ], \"type\": \"record\", \"name\": \"brocon1\" }", "records": [{"value":{"ts": {"string": "1431637257.883239"}, "uid": {"string": "CJEy5h20lBY6V5QAI3"}, "id_orig_h": {"string": "192.168.225.103"}, "id_orig_p": {"int": 2148}, "id_resp_h": {"string": "85.25.153.26"}, "id_resp_p": {"int": 20050}, "proto": {"string": "tcp"}, "service": null, "duration": null, "orig_bytes": null, "resp_bytes": null, "conn_state": {"string": "S0"}, "local_orig": null, "missed_bytes": {"long": 0}, "history": {"string": "S"}, "orig_pkts": {"long": 1}, "orig_ip_bytes": {"long": 48}, "resp_pkts": {"long": 0}, "resp_ip_bytes": {"long": 0}, "tunnel_parents": null}}]}'       "http://localhost:8082/topics/avrotest2"

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/6386d1ce-f550-4e68-a0ae-282fb8049a45%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
May 28, 2015, 2:08:09 AM5/28/15
to confluent...@googlegroups.com
Also filed https://github.com/confluentinc/kafka-rest/issues/85 since we should definitely be returning a more useful error message for this case.

--
Thanks,
Ewen

John Omernik

unread,
May 28, 2015, 10:42:38 AM5/28/15
to confluent...@googlegroups.com
Ewen - 

Thanks for your help on this. That is exactly what I was missing, (this goes to my other point about using the words value in documentation) I think in documentation it should be made clear that it's a key word when it's part of the envelope like this.  Either that or I am just a neophyte who needed some schooling :)   I will be working on the next steps now that I have this working, and looking at Camus. Thank you again! 

John

...
Reply all
Reply to author
Forward
0 new messages