Hi Ewen (assuming you'd be the best to answer this),
I'm running Confluent 3.0.0 (default out-of-the-box on a single laptop) and trying to submit some binary data to the REST service then unpack it in a KStream process and publish it to another topic.
Initially, there was trouble with the stability of the Schema Registry (per https://github.com/confluentinc/schema-registry/issues/335) but now that I have a 3-node zookeeper cluster it seems to have settled down.
To start, the overall startup (scripted) looks something like:
cd $CONFLUENT_HOME
echo "Starting Confluent Kafka environment in $CONFLUENT_HOME"
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &
sleep 5
./bin/kafka-server-start ./etc/kafka/server.properties &
sleep 5
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
sleep 5
./bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties &
sleep 5
echo "Started Confluent Kafka environment in $CONFLUENT_HOME"
Two topics are created for raw (base64-encoded & gzipped) data, and they can be queried successfully after creation:
./bin/kafka-topics --create --topic RawDataTopic --zookeeper $CONFLUENT_ZOOKEEPER_URI --partitions 1 --replication-factor 1
./bin/kafka-topics --create --topic DecodedPublicDataTopic --zookeeper $CONFLUENT_ZOOKEEPER_URI --partitions 1 --replication-factor 1
Our raw data (base64-encoded and gzipped) will be wrapped before submission as something like the following bash script:
CONFLUENT_REST_PREAMBLE="{\"records\":[{\"value\":\""
CONFLUENT_REST_EPILOGUE="\"}]}"
echo -n "$CONFLUENT_REST_PREAMBLE" > $OUTPUT_FILE
cat $1 >> $OUTPUT_FILE
echo -n "$CONFLUENT_REST_EPILOGUE" >> $OUTPUT_FILE
Then, we submit our raw data (c.json) as (and get a 200 HTTP status code):
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" --data @/a/b/c.json http://$CONFLUENT_REST_URI/topics/RawDataTopic
We can then start our Confluent Kafka Stream processor (obviously written in Scala):
.to(byteArraySerde, stringSerde, decodedPublicDataTopic)
If the decode method looks like the following,
def decodeResults(input:String): String = {
System.err.println(s"INITIAL DATA: $input")
val decoder = Base64.getDecoder
val decoded = decoder.decode(input)
System.err.println(s"DECODED DATA: $decoded")
val unzipped = Source.fromInputStream(new GZIPInputStream(new ByteArrayInputStream(decoded)), "UTF-8").mkString
System.err.println(s"UNZIPPED DATA: $unzipped")
unzipped
}
Upon running, we get (meaning it’s not base64) at decoder.decode(input):
java.lang.IllegalArgumentException: Illegal base64 character 1f
If we guess that the Confluent REST service does the encoding for us we can then try the following (just gunzip):
def decodeResults(decoded:String): String = {
System.err.println(s"DECODED DATA: $decoded")
val unzipped = Source.fromInputStream(new GZIPInputStream(new ByteArrayInputStream(decoded.getBytes("UTF-8"))), "UTF-8").mkString
System.err.println(s"UNZIPPED DATA: $unzipped")
unzipped
}
When this is run we then get (at the “val unzipped =“ line):
java.util.zip.ZipException: Not in GZIP format
Please note that we can successfully (manually) decode the same file with the first decodeResults(input:String) version above.
Therefore, it appears that the rest service must be decoding the data somehow, and I don’t see a good way to get around this issue.
Any help is appreciated! And please feel free to comment on the setup if that is not optimal.
~Jon
~