Can I add a transformer to send transaction metadata message to different topic?

170 views
Skip to first unread message

Willie Zhu

unread,
Jul 5, 2021, 7:48:15 AM7/5/21
to debezium
Hi team,
    We have many tables, such as users table, organizations table, groups table, we have separate topic for those tables. But transaction metadata are all in the same topic, that means if we only want to process user transaction, we have to consume all transaction messages and filter the messages.  Can we add a transformer to route transaction metadata message for different identity to different topics? I got some error when I try it.

    Here is our use case.
    We have users table, users_attribute table, and some other tables. Our application update users and users_attribute in one transaction. Debezium produce messages to separate topics for every table.  We use a transformer to partition messages with user ID for both users table, users_attribute table, that way messages for one user always go to the same partition and keep the order.
    Another application need to get transaction level message, for example, in one transaction, there are 1 user record and 3 users_attribute record are updated,  we need to combine those messages together to generate a new transaction message. So we need to know how many messages are in this transaction. So we need to transform the transaction metadata message.
  1. Also partition transaction metadata with the user ID.  We tried this, it works as expected.
  2. Because we only need user transaction metadata, we want to route user transaction metadata to another topic, and keep others in the default topic. We tried to use a transformer to change the topic, but it didn't work,  reported error.

Gunnar Morling

unread,
Jul 5, 2021, 9:52:05 AM7/5/21
to debe...@googlegroups.com
Hey,

The reasoning for sending all transaction messages to a single topic is that one transaction may touch any kind of tables, so from Debezium's perspective, there is no such thing like a "user transaction". You could use the content-based routing SMT for re-routing TX end messages based on their contents, but for TX begin messages, there's no way for doing that.

--Gunnar


--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/81a0b109-ea10-4f2b-8a96-33d2d85aac1en%40googlegroups.com.

Willie Zhu

unread,
Jul 15, 2021, 12:35:59 AM7/15/21
to debezium
Hi Gunnar,
    Thank you for your response. 
     Yes, we can route the TX END event to another topic based on their contents. But we also want to route it to a specific partition based on business ID, such as the user ID. That way the USER message and USER TX message will be routed to the same partition(different topic), then we can use Kafka-streams(need co-partition ) to aggregate them together to generate a transaction level message,  that transaction level message is what we really need.
     Can we put the data_collections data in the TX END event to the last message of the transaction? That way we don't need to process TX END
     or Do you have any other suggestions to aggregate messages to a transaction level message?

Gunnar Morling

unread,
Jul 15, 2021, 3:18:54 AM7/15/21
to debezium
> Can we put the data_collections data in the TX END event to the last message of the transaction?

Not directly; one could envision an SMT which buffers events until it sees the TX end event, and then does all the required re-shuffling you need to do, including modifying the last data change event and potentially dropping the TX end event. Or just send out one event representing the entire user data change, derived from multiple table level events. If you don't have too many events in individual transactions, this might work quite well.

In Kafka Streams terms, the TX topic should be represented by a GlobalKTable, so you have access to the TX marker events on all the KStreams nodes. This should allow you to do the required aggregation.

--Gunnar

Willie Zhu

unread,
Jul 21, 2021, 12:21:28 AM7/21/21
to debezium
Hi Gunnar,
     One more question, how does Debezium guarantee the transaction integration when restart or crash? 
For example,  for a transaction,  only the last commit event was not processed yet, and the server restart or crash. When the server restart, it will start processing the last commit message for that transaction,  but it can't find the transaction information, so it can't construct the TX END event.  

Chris Cranford

unread,
Jul 21, 2021, 1:17:48 AM7/21/21
to debe...@googlegroups.com, Willie Zhu
Hi Willie -

Under abnormal operations, the general rule is there are no guarantees :).

If a transaction is in progress when the connector is restarted, the current transaction metadata will be loaded from the offsets and when the connector processes the commit, a TX END event will be emitted.  The only time this won't be the case is if the offsets don't contain the transaction metadata, which could happen if the connector processed a bulk number of events just before being restarted; however in this case the connector should restart & begin processing from whatever point had been synchronized to the offsets previously.

CC
Reply all
Reply to author
Forward
0 new messages