Cassandra connector and multiple queues

61 views
Skip to first unread message

Gunnar Morling

unread,
Apr 28, 2021, 6:01:02 AM4/28/21
to debe...@googlegroups.com
Hey Bingqin, all,

I noticed the other day that the PR for adding multiple change event queues has been merged [1]. I'm still having some doubts here, can this not result in events pertaining to one primary key being emitted in a wrong order, if there are two changes for the same PK in close proximity, but in two different log files?

Also, did you have a chance to do some performance testing, i.e. is this change actually helping to increase the connector throughput? I feel like I'm still missing something about this change.

Thanks for clarifying,

--Gunnar

Ahmed Eljami

unread,
Apr 29, 2021, 6:13:55 AM4/29/21
to debe...@googlegroups.com

Hey @gunnarmorling

On our side, this will not cause any issue. As mentioned above, when we have started using Debezium we knew that the order will never be guaranteed across the cluster so we have implemented the necessary logic on the downstream side.

I think that what @bingqinzhou would say is that the order will be guaranteed within the same commitLog for the same table. Otherwise, it will not be possible to guarantee this for two differents commit files.

It could be mentioned in the doc, so users can specify the desired num of the queue processors in balance with the order through multiple commit Logs?


--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/CADGJaX-T%2Bqax8-12aPqLgJU_gvfr4uhACJ02MkCV-Rh4s3i0sw%40mail.gmail.com.


--
Cordialement;

Ahmed ELJAMI

Gunnar Morling

unread,
Apr 29, 2021, 2:41:53 PM4/29/21
to debe...@googlegroups.com
Am Do., 29. Apr. 2021 um 12:13 Uhr schrieb Ahmed Eljami <ahmed....@gmail.com>:

Hey @gunnarmorling

On our side, this will not cause any issue. As mentioned above, when we have started using Debezium we knew that the order will never be guaranteed across the cluster so we have implemented the necessary logic on the downstream side.

I think that what @bingqinzhou would say is that the order will be guaranteed within the same commitLog for the same table. Otherwise, it will not be possible to guarantee this for two differents commit files.

It could be mentioned in the doc, so users can specify the desired num of the queue processors in balance with the order through multiple commit Logs?

Ok, so I believe I understand the reasoning now: we *already* cannot guarantee correct ordering for the events pertaining to one key, as multiple events may originate from different nodes in the Cassandra cluster, and as the connector process tails the logs on each of them, events may arrive out of order. On that one a general question: why is it we cannot run the connector process on a single node, wouldn't that one receive all the changes from the other nodes eventually? I suppose there's a reason for the current design of running the connector everywhere, but I'd like to understand it :)

Now with the multiple commit log threads, we'd essentially "only" increase the odds for events to arrive out of order, if there's multiple changes in close proximity, before and after a log file switch. As consumers need to be ready to handle the out-of-order events anyways, this is deemed acceptable. If so, I think I could get behind this reasoning. My remaining question then though is whether we're sure that the change actually is an improvement. Isn't it that the connector receives a new log file to process only after Cassandra has switched to the next one (at least in Cassandra 3.x)? If so, what is gained by processing multiple files in parallel?

Also, did you consider to parallelize the processing of the events from a single file instead? For instance, a single file could be processed using fork/join, re-establishing the original order before sending all events out to Kafka. This may yield a better throughput without any reduction of ordering guarantees. WDYT?

Best,

--Gunnar

 

Le mer. 28 avr. 2021 à 12:01, Gunnar Morling <gun...@hibernate.org> a écrit :
Hey Bingqin, all,

I noticed the other day that the PR for adding multiple change event queues has been merged [1]. I'm still having some doubts here, can this not result in events pertaining to one primary key being emitted in a wrong order, if there are two changes for the same PK in close proximity, but in two different log files?

Also, did you have a chance to do some performance testing, i.e. is this change actually helping to increase the connector throughput? I feel like I'm still missing something about this change.

Thanks for clarifying,

--Gunnar

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/CADGJaX-T%2Bqax8-12aPqLgJU_gvfr4uhACJ02MkCV-Rh4s3i0sw%40mail.gmail.com.


--
Cordialement;

Ahmed ELJAMI

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

Bingqin Zhou

unread,
Apr 29, 2021, 6:04:26 PM4/29/21
to debe...@googlegroups.com
Hi Gunnar,

As previously commented in PR#25, Cassandra Connector doesn't guarantee to emit Kafka messages in the same order as change events happen in Cassandra DB regardless of this PR. Below are the reasons I can think of: 
1) Writes to an individual Cassandra node are logged as they arrive. However, these events may arrive out-of-order from which they are issued.
2) Kafka itself doesn't guarantee the order of events, because of partitions. 
The exact timestamp when a change event happens in Cassandra is recorded in a timestamp field in the corresponding kafka message, which can be used to retrieve the actual order during downstream processing. 

The orders Cassandra Connector guarantees are:
1) The order of records of rows snapshotted from one same table.
2) The order of records of mutations from one same CommitLog file. 
These orders are still guaranteed with PR#25. The reason for pertaining these orders is to determine if all records from a table or a CommitLog file have been sent to Kafka, so that the table or CommitLog file can be marked as done.

For the reasons mentioned above, I don't think this PR reduces the original order guarantee of Cassandra Connector. 

To answer your additional questions:

On that one a general question: why is it we cannot run the connector process on a single node, wouldn't that one receive all the changes from the other nodes eventually?
- This is out of scope of this PR, but as far as I know, CommitLog files on each Cassandra node only contain mutations that happen on the same node locally. 

 Isn't it that the connector receives a new log file to process only after Cassandra has switched to the next one?
- As far as I know and observe, Cassandra flushes CommitLog files into cdc_raw directory, where Cassandra Connector actually reads from, in batches. The CommitLogProcessor in Cassandra Connector watches cdc_raw directory, polls all new files flushed there then processes them all periodically.

Also, did you consider to parallelize the processing of the events from a single file instead?
- Parallelizing the processing of events from a single file will break the order of change events of mutations from the same file, which needs to be guaranteed in this Connector. Otherwise, we won't know when the last mutation of a CommitLog file has been sent to Kafka to mark this file as done.

If you still have questions or concerns about PR#25, I can schedule a call to explain it more detailedly. Please let me know your available time slots.

Thanks,
Bingqin Zhou

Gunnar Morling

unread,
May 3, 2021, 3:14:43 AM5/3/21
to debezium
Hi Bingqin,

bing...@wepay.com schrieb am Freitag, 30. April 2021 um 00:04:26 UTC+2:
Hi Gunnar,

As previously commented in PR#25, Cassandra Connector doesn't guarantee to emit Kafka messages in the same order as change events happen in Cassandra DB regardless of this PR. Below are the reasons I can think of: 
1) Writes to an individual Cassandra node are logged as they arrive. However, these events may arrive out-of-order from which they are issued.

Does this out-of-order statement apply to writes on a *single* statement? So if I do writes A and B for the same row to the same node, B could end up before A in the log? I can't quite believe that this is true?
 
2) Kafka itself doesn't guarantee the order of events, because of partitions. 

Order is guaranteed per key though (as all messages with the same key go into the same partition) which typically is what's relevant to consumers.

The exact timestamp when a change event happens in Cassandra is recorded in a timestamp field in the corresponding kafka message, which can be used to retrieve the actual order during downstream processing. 

The orders Cassandra Connector guarantees are:
1) The order of records of rows snapshotted from one same table.
2) The order of records of mutations from one same CommitLog file. 
These orders are still guaranteed with PR#25. The reason for pertaining these orders is to determine if all records from a table or a CommitLog file have been sent to Kafka, so that the table or CommitLog file can be marked as done.

But were the guarantees not stricter before #25 actually? If a mutation was done in log L1 and another one in subsequent log L2, it was guaranteed that events in L1 were emitted before those in L2. With #25, this isn't the case any more. If for instance I have a single node Cassandra (which then naturally receives all the writes), before #25 correct order per key would have been ensured (assuming the out-of-order statement at 1) above refers to multiple nodes), whereas now with #25, that's not the case any longer.

For the reasons mentioned above, I don't think this PR reduces the original order guarantee of Cassandra Connector. 

To answer your additional questions:

On that one a general question: why is it we cannot run the connector process on a single node, wouldn't that one receive all the changes from the other nodes eventually?
- This is out of scope of this PR, but as far as I know, CommitLog files on each Cassandra node only contain mutations that happen on the same node locally. 

I see, thanks for clarifying. 

 Isn't it that the connector receives a new log file to process only after Cassandra has switched to the next one?
- As far as I know and observe, Cassandra flushes CommitLog files into cdc_raw directory, where Cassandra Connector actually reads from, in batches. The CommitLogProcessor in Cassandra Connector watches cdc_raw directory, polls all new files flushed there then processes them all periodically.

Also, did you consider to parallelize the processing of the events from a single file instead?
- Parallelizing the processing of events from a single file will break the order of change events of mutations from the same file, which needs to be guaranteed in this Connector. Otherwise, we won't know when the last mutation of a CommitLog file has been sent to Kafka to mark this file as done.

Not necessarily; as I mentioned, one would have to "re-establish the original order before sending all events". How large are individual log files? Would it be feasible to parallelize processing of a single file and re-order events once done with it, with everything kept in memory? Another thing to consider is how this all will impact the connector when moving to Cassandra 4.x, where, IIUC, we'd get log files more eagerly (i.e. no batches) or even more fine-grained.

If you still have questions or concerns about PR#25, I can schedule a call to explain it more detailedly. Please let me know your available time slots.

I'm out-of-office this week. We could have a call when I'm back next week. Let's see how far we get by e-mail in the mean time. In particular, I'm still curious about any actual measurements of the impact of this change.


Thanks,
Bingqin Zhou

Thanks for taking the time to discuss these changes; ordering semantics are sort of a key issue for Debezium consumers, hence let's make sure to apply all scrutiny needed to get things right.

Best,

--Gunnar

Bingqin Zhou

unread,
May 3, 2021, 7:29:49 PM5/3/21
to debe...@googlegroups.com
Hi Gunnar,

Thanks for the explanation! I'll send you a separate email to schedule a call when you're back to office next week. 
 
Does this out-of-order statement apply to writes on a *single* statement? So if I do writes A and B for the same row to the same node, B could end up before A in the log? I can't quite believe that this is true?

I don't have much context about this actually. I read about this from the original design doc of this connector. This is also documented in https://github.com/debezium/debezium/blob/master/documentation/modules/ROOT/pages/connectors/cassandra.adoc (Limitations of Commit Logs)

Order is guaranteed per key though (as all messages with the same key go into the same partition) which typically is what's relevant to consumers.

Makes sense to me now, thanks for the explanation :)

But were the guarantees not stricter before #25 actually? If a mutation was done in log L1 and another one in subsequent log L2, it was guaranteed that events in L1 were emitted before those in L2. With #25, this isn't the case any more. If for instance I have a single node Cassandra (which then naturally receives all the writes), before #25 correct order per key would have been ensured (assuming the out-of-order statement at 1) above refers to multiple nodes), whereas now with #25, that's not the case any longer.

I don't think the guarantee of order in Cassandra Connector is as strict as other Debezium connectors. In Cassandra Connector, even snapshotting happens in parallel with commit log processing, which already results in a disorder of events.

Not necessarily; as I mentioned, one would have to "re-establish the original order before sending all events". How large are individual log files? Would it be feasible to parallelize processing of a single file and re-order events once done with it, with everything kept in memory? Another thing to consider is how this all will impact the connector when moving to Cassandra 4.x, where, IIUC, we'd get log files more eagerly (i.e. no batches) or even more fine-grained.

Yea I was told that Cassandra 4.0 will do more frequent and smaller size of commit log flushes. As for parallelizing the processing of events in a single commit log file then reconstructing the order, I'm concerned that the time and effort used to reconstruct the order will offset the time gained by parallel processing. 

Thanks,
Bingqin Zhou

Reply all
Reply to author
Forward
0 new messages