Kafka connect HDFS versus S3

332 views
Skip to first unread message

Werner Daehn

unread,
May 5, 2017, 7:54:34 AM5/5/17
to Confluent Platform
I struggle to understand why there are two kafka connectors, the hdfs and the S3 one.

My requirements are

Per my investigation, the S3 connector does not support the latter two, the hdfs connector not the first. And looking at the source code, both have a lot in common. 
So if anything, there should be one connector as the foundation and then multiple implementations with the various storage classes for hdfs, S3, Google Cloud etc.

Can somebody shed some light into this, please, and tell me where I am wrong? And how I can achieve my goal of loading S3 with parquet files and rotate.interval.ms?

Thanks in advance

Ewen Cheslack-Postava

unread,
May 13, 2017, 6:30:57 PM5/13/17
to Confluent Platform
Despite the fact that HDFS provides an S3 filesystem implementation, it doesn't (and can't) truly provide the same semantics of a filesystem -- S3 is an eventually consistent system that does not provide the semantics you'd expect from a regular filesystem.

Because of this, the designs of the two connectors for getting the semantics many people want (i.e. exactly once) are different. For example, in the case of HDFS, we can append to files, have read after write semantics, efficient renaming of files, and efficient listing of files in the filesystem. In contrast, S3 has none of these. This means the HDFS connectors approach using temp files, a WAL file for file commits, moving files to commit them, and using file listings to recover offsets does not work for S3.

There was at least one attempt to adapt the HDFS connector for S3, but it required adding a separate ACID data store for the WAL file and even when you do this, recovery after rebalancing or crashing grows increasingly expensive over time and becomes impractical with even a relatively small number of files because of the performance of S3 LISTs.

Regarding parquet support, unfortunately the parquet makes a lot of assumptions about working with HDFS -- we think it is possible to get it adapted to the S3 connector, but we couldn't include it with the first version because it's not as simple as using the parquet library to generate the file.

For rotate.interval.ms, because of S3's eventual consistency, we require that partitioners and commit triggers are deterministic to get exactly once delivery (which generally means purely based on the data). There's some work to get time-based triggers using the record timestamps into the next release of the S3 connector.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/97d80440-a3d3-4279-a621-09ded85e2d95%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Werner Daehn

unread,
May 14, 2017, 1:59:27 AM5/14/17
to Confluent Platform
Thanks Ewen, some of your statements I found out myself meanwhile, when looking into the source code of the connectors. But I am at the early stages of understanding all side-effects still.

If I am not mistaken, the most important problem is the once-and-only-once semantic, which is solved using the Write Ahead Log - and that is where all the S3 problems play a big role.
While I understand the importance of guaranteed delivery, I would argue that it is less important, especially when the target S3 file system does not guarantee it either.

To be very blunt: What is desired more? Guarantee that the data is in S3 once-and-only-once at the expense of missing recent data in the Kafka queue for a longer time? Or having a few duplicates once a while in S3? I would prefer having all data in S3 rather missing recent data for an unknown amount of time. Hence I would make WAL optional in the S3 connector implementation. Probably even in the hdfs connector assuming it has negative side effects there either.

Imagine you have a sensor producing vault codes. You do not want to lose any records, you want to see the error codes rather quickly (rotate.interval as the max latency). But if you get the same row twice in the target file, with the same error code, the same sensor timestamp even, this you can handle at query time. If that is a problem, a database with transactional consistency would be better suited anyhow.



Regarding the Parquet part of the question I understand too little at the moment. Just would hate to use Spark streaming to achieve such a simple task - read Kafka Avro and dump to S3 in Parquet format.

-Werner
To post to this group, send email to confluent...@googlegroups.com.

Ewen Cheslack-Postava

unread,
May 14, 2017, 3:05:00 PM5/14/17
to Confluent Platform
On Sat, May 13, 2017 at 10:59 PM, Werner Daehn <werner...@gmail.com> wrote:
Thanks Ewen, some of your statements I found out myself meanwhile, when looking into the source code of the connectors. But I am at the early stages of understanding all side-effects still.

If I am not mistaken, the most important problem is the once-and-only-once semantic, which is solved using the Write Ahead Log - and that is where all the S3 problems play a big role.
While I understand the importance of guaranteed delivery, I would argue that it is less important, especially when the target S3 file system does not guarantee it either.

S3's eventual consistency doesn't mean you can't achieve exactly once delivery in the sense that each Kafka record will appear in S3 in exactly one location in one file -- that is the guarantee Confluent's S3 connector can achieve. However, the eventual consistency does put constraints on how the connector can accomplish this.

There are definitely use cases where folks want this guarantee, otherwise they would have to do their own deduplication of the data after it lands in S3.
 

To be very blunt: What is desired more? Guarantee that the data is in S3 once-and-only-once at the expense of missing recent data in the Kafka queue for a longer time? Or having a few duplicates once a while in S3? I would prefer having all data in S3 rather missing recent data for an unknown amount of time. Hence I would make WAL optional in the S3 connector implementation. Probably even in the hdfs connector assuming it has negative side effects there either.

Both are desired by different folks, it depends on your use case. We chose to focus on the delivery semantics as this is something that's pretty difficult to implement correctly and we know other non-Connect solutions for landing data from Kafka into S3 generally do not achieve this.

In HDFS there wouldn't be any reason to remove the WAL, it doesn't really cost you anything and gives you better delivery semantics.
 

Imagine you have a sensor producing vault codes. You do not want to lose any records, you want to see the error codes rather quickly (rotate.interval as the max latency). But if you get the same row twice in the target file, with the same error code, the same sensor timestamp even, this you can handle at query time. If that is a problem, a database with transactional consistency would be better suited anyhow.

We're aware of use cases like this. The main concern with adding this functionality is being very clear in the documentation about the semantics. Currently everything we ship with the Connector gives you exactly once semantics out of the box. If we add a config that can break this guarantee, we'd want to be very clear about which configs can break it, how it breaks, and under what conditions the guarantee will be broken.
 



Regarding the Parquet part of the question I understand too little at the moment. Just would hate to use Spark streaming to achieve such a simple task - read Kafka Avro and dump to S3 in Parquet format.

Agreed, it's unfortunate that the parquet library is currently written the way it is.

-Ewen
 

-Werner

Am Sonntag, 14. Mai 2017 00:30:57 UTC+2 schrieb Ewen Cheslack-Postava:
Despite the fact that HDFS provides an S3 filesystem implementation, it doesn't (and can't) truly provide the same semantics of a filesystem -- S3 is an eventually consistent system that does not provide the semantics you'd expect from a regular filesystem.

Because of this, the designs of the two connectors for getting the semantics many people want (i.e. exactly once) are different. For example, in the case of HDFS, we can append to files, have read after write semantics, efficient renaming of files, and efficient listing of files in the filesystem. In contrast, S3 has none of these. This means the HDFS connectors approach using temp files, a WAL file for file commits, moving files to commit them, and using file listings to recover offsets does not work for S3.

There was at least one attempt to adapt the HDFS connector for S3, but it required adding a separate ACID data store for the WAL file and even when you do this, recovery after rebalancing or crashing grows increasingly expensive over time and becomes impractical with even a relatively small number of files because of the performance of S3 LISTs.

Regarding parquet support, unfortunately the parquet makes a lot of assumptions about working with HDFS -- we think it is possible to get it adapted to the S3 connector, but we couldn't include it with the first version because it's not as simple as using the parquet library to generate the file.

For rotate.interval.ms, because of S3's eventual consistency, we require that partitioners and commit triggers are deterministic to get exactly once delivery (which generally means purely based on the data). There's some work to get time-based triggers using the record timestamps into the next release of the S3 connector.

-Ewen
On Fri, May 5, 2017 at 4:54 AM, Werner Daehn <werner...@gmail.com> wrote:
I struggle to understand why there are two kafka connectors, the hdfs and the S3 one.

My requirements are

Per my investigation, the S3 connector does not support the latter two, the hdfs connector not the first. And looking at the source code, both have a lot in common. 
So if anything, there should be one connector as the foundation and then multiple implementations with the various storage classes for hdfs, S3, Google Cloud etc.

Can somebody shed some light into this, please, and tell me where I am wrong? And how I can achieve my goal of loading S3 with parquet files and rotate.interval.ms?

Thanks in advance

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages