Kafka Connect S3 Connector Avro and multi schema scalability

383 views
Skip to first unread message

Major OPPO

unread,
Jul 6, 2017, 11:00:19 PM7/6/17
to Confluent Platform
Hey Guys!

I wanted to know how to handle having producers using different schema versions concurrently while using Kafka-Connect


We process 10B events per hour. Those events are in Avro. They go to Kafka and then are pulled by Kafka Connect using the s3-connector provided by kafka-connect-storage-cloud.

This is a pretty new architecture at our company. We used Kafka for years but we just introduced Kakfa-Connect.

Usually the sink put files that are around 200MB to S3, as per our configuration.

However last week we introduced a new version of our schema. As we were doing a rolling update on our 400 producers we had events with different versions for more than one hour.


During this time we noticed that our files were only around 40kB in S3. This is very problematic because we ended trying to push hundreds of thousands of files to S3 (We hit multiple times the AWS s3 rate limit).


Our conclusion is that, since each file carries the schema, whenever a task sees a different version of the schema, the task closes the file put it to S3 and creates a new file for this schema version.


Is this correct? If so, how do you configure Kafka Connect to properly handle multiple schema versions running at the same time? aka without having lots of tiny files.



N.B.: We are using Kafka 0.10.2 and 3.2.0 version of the confluent platform. We also have a custom TimeBased partitionner.

I can give you more details on our configurations if needed

Randall Hauch

unread,
Jul 7, 2017, 11:43:59 AM7/7/17
to confluent...@googlegroups.com
What is the `schema.compatibiility` connector configuration property set to? It defaults to `NONE`, which means that each file written to S3 has the proper schema. When the connector observes a schema change in data, it commits the current set of files for the affected topic partitions and writes the data with the new schema in new files.

You said you did a rolling upgrade of your producers, and that this took over an hour, during which you had a mixture of events with both old and new schemas. Perhaps it is possible that a single S3 sink connector task was seeing a mixture of events with old and new schemas as well, and with `schema.compatibility=NONE` the connector would have flushed the file every time it saw a change in schema. For example, if we use 1 and 2 to signify the schema version and the connector sees events like e1, e2, e1, e2, e2, e1, e1, e2, the connector would have written them into files like this: [e1], [e2], [e1], [e2, e2], [e1, e1], [e2]. 

Other schema compatibility settings result in different behavior. If your new schema is backward compatible and you set `schema.compatibility=BACKWARD`, then the connector can use the latest version of the schema that it sees. It will flush the current file when it sees a new schema, but after that it will always use the new schema. For example, if we use 1 and 2 to signify the schema version and the connector sees events like e1, e2, e1, e2, e2, e1, e1, e2, the connector would have written them into files like this: [e1], [e2, e2, e2, e2, e2, e2, e2].

There is also the FORWARD schema compatibility setting, which is similar to BACKWARD except it uses the older schema rather than the new one. Given the same example, the files would be written like this: [e1, e1, e1, e1, e1, e1, e1, e1].

Finally, there is the FULL schema compatibility setting, which works similarly to BACKWARD as long as your schemas are both backward and forward compatible.

The S3 documentation (last line at http://docs.confluent.io/current/connect/connect-storage-cloud/kafka-connect-s3/docs/index.html) has a reference to the HDFS sink connector that talks about the schema evolution and compatibility mechanism.

Hope this helps explain what you're seeing. Best regards,

Randall


--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/3c2042fd-47cb-4255-bfe3-8335767542f4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Konstantine Karantasis

unread,
Jul 7, 2017, 12:26:19 PM7/7/17
to confluent...@googlegroups.com

Regarding your last comment, where you mention your partitioner and the connector version, I'd like to let you know that time-based partitioning has become fully featured in 3.2.2.

I would recommend an upgrade of the connector if you intend to use time-based partitioning. 

--Konstantine

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

Major OPPO

unread,
Jul 8, 2017, 5:03:11 PM7/8/17
to Confluent Platform
Hey Guys,

Indeed our `schema.compatibility` connector configuration property was set to NONE even though our schemas are fully compatible.

Thank you for clarifying the schema evolution and compatibility mechanism. I'm guessing we missed that part when we set up Kafka Connect.

We are going to upgrade the connector soon, run some tests and put that in production soon.

Thank you so much for the help :)

--MO
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages