Trouble posting binary data to REST connector & doing KStream processing

133 views
Skip to first unread message

Jon Powell

unread,
Jul 15, 2016, 8:53:09 PM7/15/16
to Confluent Platform

Hi Ewen (assuming you'd be the best to answer this),


I'm running Confluent 3.0.0 (default out-of-the-box on a single laptop) and trying to submit some binary data to the REST service then unpack it in a KStream process and publish it to another topic.


Initially, there was trouble with the stability of the Schema Registry (per https://github.com/confluentinc/schema-registry/issues/335) but now that I have a 3-node zookeeper cluster it seems to have settled down.


To start, the overall startup (scripted) looks something like:


cd $CONFLUENT_HOME

echo "Starting Confluent Kafka environment in $CONFLUENT_HOME"

./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &

sleep 5

./bin/kafka-server-start ./etc/kafka/server.properties &

sleep 5

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &

sleep 5

./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties &

sleep 5

echo "Started Confluent Kafka environment in $CONFLUENT_HOME"


Two topics are created for raw (base64-encoded & gzipped) data, and they can be queried successfully after creation:


./bin/kafka-topics --create --topic RawDataTopic --zookeeper $CONFLUENT_ZOOKEEPER_URI --partitions 1 --replication-factor 1

./bin/kafka-topics --create --topic DecodedPublicDataTopic --zookeeper $CONFLUENT_ZOOKEEPER_URI --partitions 1 --replication-factor 1


Our raw data (base64-encoded and gzipped) will be wrapped before submission as something like the following bash script:


CONFLUENT_REST_PREAMBLE="{\"records\":[{\"value\":\""

CONFLUENT_REST_EPILOGUE="\"}]}"

echo -n "$CONFLUENT_REST_PREAMBLE" > $OUTPUT_FILE

cat $1 >> $OUTPUT_FILE

echo -n "$CONFLUENT_REST_EPILOGUE" >> $OUTPUT_FILE


Then, we submit our raw data (c.json) as (and get a 200 HTTP status code):


curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" --data @/a/b/c.json http://$CONFLUENT_REST_URI/topics/RawDataTopic


We can then start our Confluent Kafka Stream processor (obviously written in Scala):


…<startup elided>

val rawDataSubmissions: KStream[Array[Byte], String] = builder.stream(rawDataTopic)
rawDataSubmissions
  .map((key, packet) => new KeyValue(key, decodeResults(packet)))
  .to(

  .to(byteArraySerde, stringSerde, decodedPublicDataTopic)


If the decode method looks like the following,


def decodeResults(input:String): String = {
  System.
err.println(s"INITIAL DATA: $input")
 
val decoder = Base64.getDecoder
 
val decoded = decoder.decode(input)
  System.
err.println(s"DECODED DATA: $decoded")
 
val unzipped = Source.fromInputStream(new GZIPInputStream(new ByteArrayInputStream(decoded)), "UTF-8").mkString
  System.
err.println(s"UNZIPPED DATA: $unzipped"
)
  unzipped
}


Upon running, we get (meaning it’s not base64) at decoder.decode(input):


   java.lang.IllegalArgumentException: Illegal base64 character 1f


If we guess that the Confluent REST service does the encoding for us we can then try the following (just gunzip):


def decodeResults(decoded:String): String = {
  System.err.println(s"DECODED DATA: $decoded")
  val unzipped = Source.fromInputStream(new GZIPInputStream(new ByteArrayInputStream(decoded.getBytes("UTF-8"))), "UTF-8").mkString
  System.err.println(s"UNZIPPED DATA: $unzipped")
  unzipped
}


When this is run we then get (at the “val unzipped =“ line):


  java.util.zip.ZipException: Not in GZIP format


Please note that we can successfully (manually) decode the same file with the first decodeResults(input:String) version above.


Therefore, it appears that the rest service must be decoding the data somehow, and I don’t see a good way to get around this issue.


Any help is appreciated! And please feel free to comment on the setup if that is not optimal.


~Jon



~                                                                                                                               

ha...@confluent.io

unread,
Jul 16, 2016, 5:02:17 PM7/16/16
to Confluent Platform
The Base64 encoding of the binary data should only be for the HTTP transport to/from the REST Proxy. Once the message is accepted by the REST Proxy it would be decoded and then published to Kafka as binary data (not as Base64, nor as a UTF-8 String).

-hans
Reply all
Reply to author
Forward
0 new messages