So, been digging around with Kafka-Rest and have run into some issues. Let me describe the situation first:
We have a Kafka cluster sitting inside of a Data warehouse. We don't want to expose the Kafka cluster to 'the world' and without going into a crazy amount of IPSEC tunneling shenanigans, we wanted to be able to expose a REST interface to the cluster (as an endpoint, only allowing certain requests, limiting access, etc).
So, as a little testbed, i put together a simple producer that writes directly to Kafka It's nothing more than a sentence producer, flat text, no encryption.
I was able to put up a Kafka-Rest proxy no problem, was able to query the topics, get details about the topics without issue.
Then I tried to read from the topic.
I keep getting a '{"error_code":40601,"message":"The requested embedded data format does not match the deserializer for this consumer instance"}.'
OK, so I need to format the message in an appropriate manner. Fine. So I wrote the following in python:
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
import time
import random
import logging
import json
import base64
logging.basicConfig(level=logging.INFO)
s_nouns = ["A dude", "My mom", "The king", "Some guy"]
p_nouns = ["These dudes", "Both of my moms", "Supermen"]
s_verbs = ["eats", "kicks", "gives", "treats", "meets with", "creates", "hacks"]
p_verbs = ["eat", "kick", "give", "treat", "meet with", "create", "hack"]
infinitives = ["to make a pie.", "for no apparent reason.", "because the sky is green."]
broker = 'localhost:9092'
logging.info("Connecting to Kafka - {0}".format(broker))
client = KafkaClient(broker)
producer = SimpleProducer(client)
# write out 1000 random sentences
index = 0
logging.info("sending strings")
while index < 10000:
index += 1
sentence = random.choice(s_nouns) + " " + \
random.choice(s_verbs) + " " + \
random.choice(s_nouns).lower() or random.choice(p_nouns).lower() + " " + \
random.choice(infinitives)
producer.send_messages('test', json.dumps({"value": base64.b64encode(sentence)}))
logging.info("sent {0} of 10000".format(index))
time.sleep(5)
And then issued the following curl commands:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" --data '{"id": "my_instance", "format": "binary", "auto.offset.reset": "smallest"}'
http://localhost:8082/consumers/my_binary_consumerWhich returned :
{"instance_id":"my_instance","base_uri":"
http://localhost:8082/consumers/my_binary_consumer/instances/my_instance"}
And then:
curl -X GET -H "Content-Type: application/vnd.kafka.binary.v1+json"
http://localhost:8082/consumers/my_binary_consumer/instances/my_instance/topics/testProxyWhich returns:
{"error_code":40601,"message":"The requested embedded data format does not match the deserializer for this consumer instance"}
So, what did I miss here? MUST I push data into kafka through the proxy?