This proposal describes our CDC framework design and implementation points of CDC on GP. We are open to CDC-related discussions and welcome your suggestions.
Firstly in chapter 1 and chapter 2 I will describe the background, then in chapter 3 and chapter 4 I will describe our CDC's framework design and the related code change in GP, finally in chapter 5 I will show one typical application of CDC and the related
code change in GP.
Change data capture (CDC) is the process of recognizing a change in a source data system so that a downstream system can act on that change, usually by updating a target system to reflect
new information.
In general, a database's logs describe the data changes in the database. Customer can utilize CDC to export the database's logs in logical format and
use the logs for any application.
CDC is widely used in data warehouses and applications closely related to databases, such as data synchronization, data backup, auditing, ETL, etc. Let me briefly introduce it.
Many companies now have very complex big data solutions, such as data lakes. OLTP database and OLAP database will be used in the scene. The OLTP database is used to process a large number of data change requests, and the OLAP database is used to efficiently
answer complex data queries. Usually OLTP database and OLAP database are not one. Customers can use CDC to export logical logs from OLTP databases, and then load logical logs into OLAP databases.

In this case, the customer does not need a strong real-time relationship between the source and target databases. Customers do not care whether the data change progress of the target database lags behind the data change progress of the source database. So customers
can use CDC to export logical logs from the source database, and these logs can be used by the target database. One point worth emphasizing is that since the logs are logical logs, the source and target databases can be of different classes, such as Postgres
and Oracle.

In some Internet backend applications, some producer microservices change the data in the database, and consumer microservices can subscribe to data changes.

The logical decoding mechanism in PG is the basis for our implementation of CDC on GP. Logical
decoding is implemented by decoding the contents of the write-ahead log(WAL), which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.
In the context of logical decoding, a replication slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database. A logical slot will emit each change just once in normal operation.
Then we decode these change into delimeted, csv or json, and send them into target destination by our defined output
plugins, such as test_decoding.
test_decoding is an output plugin implemented by PG, as an example of an output plugin. We certainly can implement our own output plugin. test_decoding receives WAL through the logical decoding mechanism and decodes it into text representations of the operations
performed. The following command shows the process of applying a replication slot of test decoding:
bash-4.2$ PGOPTIONS='-c gp_role=utility' psql testdb -p 7000
testdb=# create table example_tbl(a int PRIMARY KEY, b int);
CREATE TABLE
testdb=# SELECT * FROM pg_create_logical_replication_slot('example_slot', 'test_decoding');
slot_name | lsn
--------------+------------
example_slot | 0/2FBD2B38
(1 row)
testdb=# SELECT * FROM pg_logical_slot_get_changes('example_slot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
We connect to a Postgres server to show this example.
pg_create_logical_replication_slot is
used to create a logical replication slot, the slot's name is example_slot,
the output plugins's name is test_decoding.
pg_logical_slot_get_changes can
get data change from last time this function was called. Because we don't do anything after the slot is created, so we get nothing.
testdb=# insert into example_tbl values(1,1);
INSERT 0 1
testdb=# SELECT * FROM pg_logical_slot_get_changes('example_slot', NULL, NULL);
lsn | xid | data
------------+------+-------------------------------------------------------------
0/2FBD6DE8 | 1373 | BEGIN 1373
0/2FBD6DE8 | 1373 | table public.example_tbl: INSERT: a[integer]:1 b[integer]:1
0/2FBD6F00 | 1373 | COMMIT 1373
(3 rows)
We perform an insert, and pg_logical_slot_get_changes show
all three log records produced by this insert transaction in human readable format: begin record, insert record, commit record. In the insert record, we see the contents of the new tuple and the table name. In the begin and commit record we see the transaction
id.
testdb=# update example_tbl set b = 10 where a = 1;
UPDATE 1
testdb=# SELECT * FROM pg_logical_slot_get_changes('example_slot', NULL, NULL);
lsn | xid | data
------------+------+-----------------------------------------------------------------------------------------------------------
-
0/2FBD71A0 | 1376 | BEGIN 1376
0/2FBD71A0 | 1376 | table public.example_tbl: UPDATE: old-key: a[integer]:1 b[integer]:2 new-tuple: a[integer]:1 b[integer]:10
0/2FBD7230 | 1376 | COMMIT 1376
(3 rows)
If the DML is an update, pg_logical_slot_get_changes shows
the old and new tuple contents of the update xlog record.
testdb=# delete from example_tbl where a = 1;
DELETE 1
testdb=# SELECT * FROM pg_logical_slot_get_changes('example_slot', NULL, NULL);
lsn | xid | data
------------+------+--------------------------------------------------------------
0/2FBD7268 | 1377 | BEGIN 1377
0/2FBD7268 | 1377 | table public.example_tbl: DELETE: a[integer]:1 b[integer]:10
0/2FBD72E8 | 1377 | COMMIT 1377
(3 rows)
If the DML is a delete, then pg_logical_slot_get_changes shows
the old tuple contents of the delete xlog record.
The above demo is just to show the concept of logical decoding and the output data format of logical decoding. In a production environment, logical logs decoded from WAL should be automatically output without calling the function. The working mode is like this:

There are three interactive steps as shown in the picture:
CREATE_REPLICATION_SLOT command
to create the slot, and the START_REPLICATION command
to start decoding. You can perform these operations with libpq. We use psql to demonstrate these:bash-4.2$ psql -p 7001 "dbname=testdb replication=database" //connect PG in WALSender working mode psql (12beta2)
Type "help" for help.
testdb=# CREATE_REPLICATION_SLOT example_slot LOGICAL test_decoding; //create slot command
slot_name | consistent_point | snapshot_name | output_plugin
--------------+------------------+---------------------+---------------
example_slot | 0/2FA2F4E8 | 00000003-0000DEA6-1 | test_decoding
(1 row)
testdb=# START_REPLICATION SLOT "example_slot" LOGICAL 0/0; //start decoding command
testdb=#
So the decoding starts to run, WALSender decodes the WAL records one by one, and outputs the decoded contents through the output plugin.
We know that the xlog records of transactions are mixed in the log files like this:

When WALSender decodes xlog records, it does not output them immediately. It first buffers and reorders them, assigning records to different buffers based on transaction-id, each transaction having its own buffer. Until the commit record of the transaction is encountered, WALSender outputs all the buffered contents of the transaction.
We can see the following example, although the order of xlog records of different transactions in log file is chaotic, but the order of transactions that WALSender outputs xlog records is determined: in the order of transaction commit records. Although T1 starts
before T2, T3. T2 starts before T3. But the order of commit records in log file is T3, T2, T1. So the order WALSender outputs xlog records is T3, T2, T1.

These are all done by the ReorderBuffer component in WALSender. It can be considered informally that a transaction is the unit of logical decoding output.
We use a process called cdc_controller outside of GP to dispatch CREATE_REPLICATION_SLOT commands
and START_REPLICATION commands
to master and segments. The cdc_controller also maintains connections to the master and segments. Because clients may need the distributed transaction log on the master, we also decode the logs on the master.
Kafka is the most widely used message queue, so we use it as the decoded content output channel. Clients can consume from Kafka to obtain source-GP data changes.

I first introduce a type of xlog record called xl_running_xacts, here is its structure definition:
/*
* When we write running xact data to WAL, we use this structure.
*/
typedef struct xl_running_xacts
{
int xcnt; /* # of xact ids in xids[] */
int subxcnt; /* # of subxact ids in xids[] */
bool subxid_overflow; /* snapshot overflowed, subxids missing */
TransactionId nextXid; /* xid from ShmemVariableCache->nextFullXid */
TransactionId oldestRunningXid; /* *not* oldestXmin */
TransactionId latestCompletedXid; /* so we can set xmax */
TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
} xl_running_xacts;
The field xids describes
the transactions that was running when xl_running_xacts was
logged, and nextXid describes the value of the currently known largest transaction-id plus 1. Conceptually,
these fields describe a snapshot of when xl_running_xacts was
recorded.
Slot is created by WALSender, the procedures are:
CREATE_REPLICATION_SLOT command,
it logs the first xl_running_xacts,
which records the transactions that were running at that time.xl_running_xacts to
end, then logs the second xl_running_xacts,
which records the currently running transactions.xl_running_xacts is
encountered.xl_running_xacts to
end and logs the third xl_running_xacts.xl_running_xacts.
Then, WALSender officially starts decoding. The transactions that whose commit record are after the third xl_running_xacts in
the log file will be decoded and output.
You may confused by these procedures.
I will describe the logic of slot creating in another proposal page because of space limit here.
The most important conclusion I want to give is: WALSender needs to wait for some running transactions to end, and this wait will happen in two rounds.
I give a definition, startingpoint-id: the distributed-transaction-id of the first distributed transaction output by WALSender.
We want to create distributed-transaction-consistent slots between segments, meaning that all segments have a startingpoint-id greater than or equal to the same distributed-transaction-id 't', and the logs for all distributed transactions are complete, which means there will be no lack of logs on a segment.
We can give a counter-example to illustrate why we need distributed transaction-consistent.
These are the logs of 3 segments and the startingpoint-ids. Both segment-0 and segment-2 start decoding from T6's xlog record, but seg1 starts
decoding from T7's xlog record. The xlog records of T6 on segment-1 will not be decoded and output. Clients will not get all xlog records for T6, which violates transactional consistency.

Let's look back at the conclusion of 4.1.2. When WALSender creates a slot on a segment, it needs to do two rounds wait.
For a distributed transaction, it can execute on any subset of all segments, and each of its local transactions on a segment can start at any time. Therefore, the set of distributed transactions being executed on a segment is irregular and cannot be predicted
at any time.
If we simply send the CREATE_REPLICATION_SLOT command
to all segments, the distributed transactions running on each segment at that moment are irregular, and after two rounds wait, the startingpoint-id on
each segment is irregular, so we may not get the distributed-transaction-consistent slot.
I show a simple example with 2 segments and 4 distributed transactions. In the diagram there are xlogs for the segments and a timeline for logging the xlog records.
The first xl_runnning_xacts is
recorded when the segment receives the CREATE_REPLICATION_SLOT command, xl_runnning_xacts of
segment 0 records T1 as a running transaction and xl_runnning_xacts of
segment 1 records T2 as a running transaction transaction, then segments' WALSender does two rounds wait:
The WALSender on segment 0 first waits for the end of T1, and when T1 commits, it logs the second xl_runnning_xacts.
T2, T3 started and logged some WAL records on the first round wait of WALSender, so the second xl_runnning_xacts records
T2, T3 as running transactions. Then WALSender waits for T2, T3 to finish and records the third xl_running_xacts.
This is the second round wait. T4 commits after the third xl_running_xacts,
so decoding starts from T4.
The WALSender on segment 1 performs the same process, but T4 on segment 1 commits before the third xl_running_xacts,
so T4 on segment 1 will not be decoded, which violates distributed-transaction-consistent.

In the customer's production environment, there are more segments and more concurrent distributed transactions, making the situation more complicated. If we simply send CREATE_REPLICATION_SLOT command
to all segments, we cannot get distributed-transaction-consistent slots.
So we come to a conclusion: if we want to create distributed-transaction-consistent slots
between segments, when we send CREATE_REPLICATION_SLOT command
to segments, we have to make sure that there is no running distributed transaction, we can call this case as a peaceful
environment.
It would be perfect if the customer can help us create a peaceful environment. If not, we should have an alternative.
The core idea is that we introduce a new lock called transaction-start-lock, placed at the beginning of the function StartTransaction,
which will be called when the client sends the begin command
to the master to start a distributed transaction. The pseudo code is this:
StartTransaction(void)
{
aquire transaction-start-lock in share-mode;
release transaction-start-lock;
other code...
}
The transaction acquire the lock and release it immediately.
|
Relevant code modification in GP
|
We implemented 2 commands named gp_acquire_transaction_start_lock and gp_release_transaction_start_lock. gp_acquire_transaction_start_lock acquires
the transaction-start-lock in exclude mode, and gp_release_transaction_start_lock releases
the transaction-start-lock.
We also implemented a command called gp_wait_running_transaction_end_except_one, cdc_controller can send this command to the master, then the session will be blocked until there are no running distributed transactions on the master.
We implement these 3 commands through CREATE FUNCTION,
so there no need to modify GP code for these 3 commands. We will only implant these commands into the GP when we actually need them.
|
Relevant code
|
Let's see how we use transaction-start-lock to produce peaceful
environment.
Firstly cdc_controller sends gp_acquire_transaction_start_lock command
to master, then new incoming distributed transactions will be blocked afterwards. Secondly cdc_controller sends gp_wait_running_transaction_end_except_one to
the master, when the command returns, there is no running distributed transaction on the master now, it is now a peaceful environment. Therefore,
we can send CREATE_REPLICATION_SLOT commands
to the master and segments, which can create distributed-transaction-consistent slots.
The figure below shows this process timeline. We assume that transaction-1 is the only distributed transaction running on the master, and it acquires the transaction-start-lock before the master receives the gp_acquire_transaction_start_lock command
from the cdc_controller, so it will not be blocked. transaction-2 starts after master receives gp_acquire_transaction_start_lock command,
so it will be blocked.

In addition to these, we need a distributed snapshot when creating slots between segments. Currently, the only way to obtain a distributed snapshot is to start a distributed transaction and run the pg_export_snapshot command.
At RC isolation level, cdc_controller starts a transaction we call snapshotting-transaction for exporting
the snapshot before sending gp_acquire_transaction_start_lock to the master. When entering the peaceful
environment, let the snapshotting-transaction export a snapshot, so this snapshot presents the data state of the GP before we start logical decoding. Snapshots are closely combined with data changes output by logical decoding.
So gp_wait_running_transaction_except_one should not wait for the snapshotting-transaction to
end, there should be a parameter passed to gp_wait_running_transaction_except_one to tell the distributed
transaction id of the snapshotting-transaction.
We will introduce one application of CDC: non-real-time data synchronization from one source-GP to another target-GP.
We deploy a process called
cdc_replayer that consumes decoded content from Kafka.
The decoded content from segments represents the data changes of the local transactions that make up the distributed transaction.
The decoded content from the master represents the marker to commit the distributed transaction.
cdc_replayer reassembles them into new distributed transactions, committing these distributed transactions to the target-GP.

Clients may wish to maintain transaction atomicity and transaction consistency on the target-GP, but there are two situations where the current GP can't achieve the goal.
The commit WAL record of the distributed transaction on the master is xl_xact_distributed_forget,
which only records the distributed-transaction-id, and does not record the segment information of its execution. Looking at the situation in the figure below, T1 is executed on segment-0 and segment-1, but cdc_replayer only gets the log of segment-0 and master,
and the log of segment-1 is blocked for some reason. In this case, cdc_replayer does not know whether it should commit T1 to target-GP, or wait for the remaining logs.

So we need add info about the segments on which distributed transaction executed to xlog.
|
Relevant code modification in GP
|
If a distributed transaction is only executed on one segment, we call it one-phase.
An one-phase distributed-transaction does not log any WAL record on the master. Normally cdc_replayer uses xl_xact_distributed_forget as
a sign to commit distributed transactions to target-GP, so we cannot handle one-phase distributed
transactions now.

So we need add additional information to xlog to identify this special case.
|
Relevant code modification in GP
|
It should be emphasized that logical decoding only works when wal_level
>= logical, so these new fields will only be written to the xlog when wal_level
>= logical. Our customers' GPs now all have wal_level
< logical set, so this code change won't have
any impact.

CREATE_REPLICATION_SLOT command
to the master and segments to create slots; then sends START_REPLICATION command
to the master and segments, but the decoding doesn't start. We block the decoding through the cdc-plugin we implemented.
pg_export_snapshot to
export a snapshot, and cdc_controller reads the content of the snapshot according to the snapshot-id.
send_snapshot_to_cdc_plugin command
to the master and segments, passing in the content of the snapshot obtained in the second step.
send_snapshot_to_cdc_plugin is
a command implemented by ourselves, which can be implanted into GP through CREATE FUNCTION,
and the function is to send the snapshot content to the cdc-plugin.
When the cdc-plugin receives the snapshot, WALSender ends the blocking state and works normally. Afterwards, when outputting each transaction through the cdc-plugin, it is necessary to check the relationship between the transaction and the snapshot. If it was
committed before the snapshot, the content of the transaction will not be output; otherwise, it will be output.
|
Relevant code modification in GP |
|
The result of this is that all transactions committed after the snapshot will be output. The startingpoint-ids on each segment are distributed-transaction-consistent, although WALSender on each segment starts decoding from different distributed transactions.
Let's look at the example in the figure below.

In the first step, cdc_controller sends the CREATE_REPLICATION_SLOT command
and the START_REPLICATION command
to segment1 and segment2.
The startingpoint-id on segment1 is T8, and it will output T8, T7, T9 under normal conditions; the startingpoint-id on
segment2 is T6, and it will output T6, T8, T7, T9 under normal conditions. But as mentioned in 4.2.1, we use the cdc-plugin to make WALSender in a blocked state, and decoding does not actually start.
In the second step, cdc_controller gets a snapshot. Because GP is still working, some transactions are running or committed, the snapshot is:
xmin:T7
xmax:T10
xips:T7,T9
In the third step, we use the send_snapshot_to_cdc_plugin command
to send the snapshot content to the cdc-plugin of the two segments. The WALSender of the two segments then ends the blocking state and starts working normally. When they output the transaction content through the cdc-plugin, they will first compare it with
the snapshot, and only output the transaction committed after the snapshot. segment1 will not output T8, but output T7, T9; segment2 will not output T6, T8, but output T7, T9. Both segments are output from T7. The startingpoint-ids on
segments are distributed-transaction-consistent.
发件人:
'Chong Wang' via Greenplum Developers <gpdb...@greenplum.org>
日期:
星期一, 2023年5月8日
15:53
收件人:
gpdb...@greenplum.org <gpdb...@greenplum.org>
主题:
CDC on GP
|
!! External Email |
|
!! External Email: This email originated from outside of the organization. Do not click links or open attachments unless you recognize the sender. |