I am working on a POC on using Debezium Oracle Connector to capture both DDL and DML change events. I have a setup which is dockerized. The setup is running Oracle 21c. The LogMiner reads logs in redo_log_catalog mode with ARCHIVELOG, supplemental logging enabled. It is running docker images (2.2.0.Final) of Zookeeper, Kafka, Kafka connect components. The setup is configured referencing the blogs:
https://debezium.io/blog/2022/09/30/debezium-oracle-series-part-1/
https://debezium.io/blog/2022/10/06/debezium-oracle-series-part-2/
All is well here. Debezium writes DDL change event messages to topic with name = <topic_prex_property_value> and DML changes to topic with name = <topic_prex_property_value>.<schema_name>.<table_namne>
As next step, I have put together a Java application which uses embedded Debezium engine. Here, I do not use Zookeeper/Kafka/Kafka Connect orchestration. My pom.xml has pulled all necessary jars. My property set is as follows:
config = Configuration.empty().withSystemProperties(Function.identity()).edit()
.with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.oracle.OracleConnector")
.with(EmbeddedEngine.ENGINE_NAME, APP_NAME)
.with("schemas.enable", false)
.build();
final Properties props = config.asProperties();
props.setProperty("name", "engine");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "path/set/to/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "1521");
props.setProperty("database.dbname", "<DB_NAME>");
props.setProperty("database.user", "<DB_USER>");
props.setProperty("database.password", "<DB_PASSWORD>");
props.setProperty("database.include.list", "<SINGLE_TABLE_NAME>");
props.setProperty("topic.prefix", "fullfilment");
props.setProperty("database.connection.adapter", "logminer");
props.setProperty("log.mining.strategy", "redo_log_catalog");
props.setProperty("include.schema.changes", "true");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename",
"path/set/to/abc.dat");
props.setProperty("database.encrypt", "false");
Now my observation is that Debezium polls DML changes and sends event messages which I am able to process in Java:
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(props).notifying((records, committer) -> {
System.out.println("==== Data change found ===" + records);
//Do process records and do stuff here.
}).build()) {
Executors.newSingleThreadExecutor().execute(engine);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
}
However, the messages (records) received in the code above do not include DDL changes. All DDL changes are written to the file (abc.dat) which is set as value to schema.history.internal.file.filename property correctly.
Now, my question is how can I configure the Debezium engine to get both DDL change event messages as ChangeEvents which are part of Debezium response. Thus, I can process both DML and DDL events/ messages inside the lambda function shown in the snippet above. Could you please suggest how I can achieve this.
Thanks and regards-
Anu
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/73a79a10-f84f-43a5-b72d-de25c682f47en%40googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/0862eb48-6790-44fb-bdd6-166f558c8f3fn%40googlegroups.com.