Can I use Kafka-Rest Proxy as a generic Proxy to Kafka?

3,146 views
Skip to first unread message

Ash Matheson

unread,
May 10, 2015, 3:16:42 PM5/10/15
to confluent...@googlegroups.com
So, been digging around with Kafka-Rest and have run into some issues.  Let me describe the situation first:

We have a Kafka cluster sitting inside of a Data warehouse.  We don't want to expose the Kafka cluster to 'the world' and without going into a crazy amount of IPSEC tunneling shenanigans, we wanted to be able to expose a REST interface to the cluster (as an endpoint, only allowing certain requests, limiting access, etc).

So, as a little testbed, i put together a simple producer that writes directly to Kafka It's nothing more than a sentence producer, flat text, no encryption.

I was able to put up a Kafka-Rest proxy no problem, was able to query the topics, get details about the topics without issue.

Then I tried to read from the topic.

I keep getting a '{"error_code":40601,"message":"The requested embedded data format does not match the deserializer for this consumer instance"}.'

OK, so I need to format the message in an appropriate manner.  Fine.  So I wrote the following in python:

from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer

import time
import random
import logging
import json
import base64

logging.basicConfig(level=logging.INFO)

s_nouns = ["A dude", "My mom", "The king", "Some guy"]
p_nouns = ["These dudes", "Both of my moms",  "Supermen"]
s_verbs = ["eats", "kicks", "gives", "treats", "meets with", "creates", "hacks"]
p_verbs = ["eat", "kick", "give", "treat", "meet with", "create", "hack"]
infinitives = ["to make a pie.", "for no apparent reason.", "because the sky is green."]

broker = 'localhost:9092'

logging.info("Connecting to Kafka - {0}".format(broker))
client = KafkaClient(broker)
producer = SimpleProducer(client)

# write out 1000 random sentences
index = 0
logging.info("sending strings")
while index < 10000:
    index += 1
    sentence = random.choice(s_nouns) + " " + \
        random.choice(s_verbs) + " " + \
        random.choice(s_nouns).lower() or random.choice(p_nouns).lower() + " " + \
        random.choice(infinitives)
    producer.send_messages('test', json.dumps({"value": base64.b64encode(sentence)}))
    logging.info("sent {0} of 10000".format(index))
    time.sleep(5)

And then issued the following curl commands:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" --data '{"id": "my_instance", "format": "binary", "auto.offset.reset": "smallest"}' http://localhost:8082/consumers/my_binary_consumer

Which returned :
{"instance_id":"my_instance","base_uri":"http://localhost:8082/consumers/my_binary_consumer/instances/my_instance"}

And then:
curl -X GET -H "Content-Type: application/vnd.kafka.binary.v1+json" http://localhost:8082/consumers/my_binary_consumer/instances/my_instance/topics/testProxy

Which returns:
{"error_code":40601,"message":"The requested embedded data format does not match the deserializer for this consumer instance"}

So, what did I miss here?  MUST I push data into kafka through the proxy?

Ewen Cheslack-Postava

unread,
May 10, 2015, 4:48:47 PM5/10/15
to confluent...@googlegroups.com
When requesting the data, you should use the header "Accept: application/vnd.kafka.binary.v1+json". The Accept header is used to indicate the type (or types) of data you'll accept in the response. Content-Type (when attached to the request as curl's -H does) indicate the format of the request body. In this case, it isn't doing anything since there is no request body (no --data argument).

I'm a bit surprised that just omitting it didn't work though -- while specifying the full format expected is highly encouraged, we do allow fallbacks for less specific REST proxy formats (e.g. application/vnd.kafka+json), more general types (e.g. application/json), and even the catch all */* to return the binary data format. That said, the implementation of the content negotiation is in Jersey, and it's possible that omitting the format entirely causes it to not match any of these.
 

Which returns:
{"error_code":40601,"message":"The requested embedded data format does not match the deserializer for this consumer instance"}

So, what did I miss here?  MUST I push data into kafka through the proxy?

Definitely not, the proxy should be compatible with data sent in via any other client (assuming the Avro format matches or you read back raw binary data, of course).

If the Accept header doesn't fix the issue, try the sequence of commands in the quickstart for a binary consumer, but replace the topic names. Then work from there to track down any differences between the two.

-Ewen
 

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/e979293e-12d5-40d6-8408-ce7d5e0426c4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

amat...@demonware.net

unread,
May 11, 2015, 1:24:14 PM5/11/15
to confluent...@googlegroups.com
Thanks for the response.  I'll set up a small git repo with the code later today if the suggestions don't work.

amat...@demonware.net

unread,
May 11, 2015, 2:09:13 PM5/11/15
to confluent...@googlegroups.com
That would appear to have fixed it!



On Sunday, May 10, 2015 at 12:16:42 PM UTC-7, Ash Matheson wrote:

amat...@demonware.net

unread,
May 11, 2015, 6:48:13 PM5/11/15
to confluent...@googlegroups.com
Next question:

I'm putting together some examples in Python.  What I'm trying to do is create a consumer, then query data off that consumer, in separate python scripts.

ScriptA.py:
 - Create a consumer for a topic

ScriptB.py
 - read messages off the consumer.

Is there any way to determine if there is an existing consumer?  Aside from directly querying the consumer and getting back a 'Consumer instance not found' result?  

On Sunday, May 10, 2015 at 12:16:42 PM UTC-7, Ash Matheson wrote:

Ewen Cheslack-Postava

unread,
May 11, 2015, 7:40:23 PM5/11/15
to confluent...@googlegroups.com
No, there isn't a way to do that currently. In fact, we found an issue with the way creation requests are handled because they weren't properly checking for conflicts. This PR (https://github.com/confluentinc/kafka-rest/pull/67) will fix that in the next version, triggering a 409 if there is a conflict. That will allow you to just try to create the consumer and as long as you handle that specific error, you should then be able to use the consumer to read messages.

Just be careful using this approach -- depending on how your processes are managed, it can accidentally lead to multiple processes using the same consumer to read messages, which often isn't what you want (e.g. if you care about all messages with the same key going to the same process). Usually it's safer to create a new consumer instance for each process, delete it gracefully when possible, and in the case of a failure simply let it timeout.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages