signal.data.collection separate schema

53 views
Skip to first unread message

Gergely Jahn

unread,
Nov 17, 2025, 9:18:09 AMNov 17
to debezium
Hi,

I have Debezium Oracle connector set up for mining several tables in a schema.
My DEBEZIUM_SIGNAL table is in a separate schema.
If I  configure snapshot.include.collection.list then the connector fails with the following exception:
io.debezium.DebeziumException: Unable to find relational table model for 'FREE.BASEADM_1.DEBEZIUM_SIGNAL', there may be an issue with your include/exclude list configuration.
 
I have to add FREE.BASEADM_1.DEBEZIUM_SIGNAL  to snapshot.include.collection.list to make it work. 

Working config:

{
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"database.hostname": "oracle",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.dbname": "FREE",
"decimal.handling.mode": "double",
"topic.prefix": "CHECKSIGN",
"topic.creation.default.partitions": "5",
"topic.creation.default.replication.factor": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schemaregistry:8081",
"table.include.list": "BASEADM.STTM_ACCOUNT_CLASS",
"snapshot.max.threads": "1",
"snapshot.locking.mode": "none",
"tombstones.on.delete": "false",
"schema.history.internal.kafka.bootstrap.servers": "broker:29092",
"schema.history.internal.kafka.topic": "schema-changes.checksign",
"snapshot.mode": "initial",
"log.mining.strategy": "redo_log_catalog",
"custom.metric.tags": "table=partner",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "CHECKSIGN.BASEADM.(.*)",
"transforms.Reroute.topic.replacement": "PUBLIC.STREAMING.FLEX.$1.DEV",
"signal.enabled.channels": "source,kafka",
"signal.data.collection": "FREE.BASEADM_1.DEBEZIUM_SIGNAL",
"signal.kafka.topic": "CHECKSIGN.SIGNALS",
"signal.kafka.bootstrap.servers": "broker:29092",
"schema.history.internal.store.only.captured.tables.ddl":"true",
"snapshot.include.collection.list": "FREE.BASEADM.STTM_ACCOUNT_CLASS,FREE.BASEADM_1.DEBEZIUM_SIGNAL",
"incremental.snapshot.chunk.size":"100000"
}

Failing config:

{
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"database.hostname": "oracle",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.dbname": "FREE",
"decimal.handling.mode": "double",
"topic.prefix": "CHECKSIGN",
"topic.creation.default.partitions": "5",
"topic.creation.default.replication.factor": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schemaregistry:8081",
"table.include.list": "BASEADM.STTM_ACCOUNT_CLASS",
"snapshot.max.threads": "1",
"snapshot.locking.mode": "none",
"tombstones.on.delete": "false",
"schema.history.internal.kafka.bootstrap.servers": "broker:29092",
"schema.history.internal.kafka.topic": "schema-changes.checksign",
"snapshot.mode": "initial",
"log.mining.strategy": "redo_log_catalog",
"custom.metric.tags": "table=partner",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "CHECKSIGN.BASEADM.(.*)",
"transforms.Reroute.topic.replacement": "PUBLIC.STREAMING.FLEX.$1.DEV",
"signal.enabled.channels": "source,kafka",
"signal.data.collection": "FREE.BASEADM_1.DEBEZIUM_SIGNAL",
"signal.kafka.topic": "CHECKSIGN.SIGNALS",
"signal.kafka.bootstrap.servers": "broker:29092",
"schema.history.internal.store.only.captured.tables.ddl":"true",
"snapshot.include.collection.list": "FREE.BASEADM.STTM_ACCOUNT_CLASS",
"incremental.snapshot.chunk.size":"100000"
}

Is this something expected?

Best,
Gergely Jahn (Greg)

Chris Cranford

unread,
Nov 18, 2025, 4:08:40 AMNov 18
to debe...@googlegroups.com
What if you include the signal table in the `table.include.list` and you drop the `snapshot.include.collection.list` ?
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/debezium/fb86b648-7452-4e94-b697-7e85f68bb261n%40googlegroups.com.

Gergely Jahn

unread,
Nov 18, 2025, 12:31:20 PM (14 days ago) Nov 18
to debezium
If i drop the `snapshot.include.collection.list` it works as expected. I don't need to add the signal table to `table.include.list`.
If I set "snapshot.include.collection.list": "FREE.BASEADM.STTM_ACCOUNT_CLASS,FREE.BASEADM_1.PARTNER", so there is at least one table added from the schema where the signal table is, it works as well.
It seems like if `snapshot.include.collection.list` is added it must have at least one table from the schema of the signal table (it can be the signal table itself).
Is it possible that some schema list is assembled based on `snapshot.include.collection.list` and the signal table's schema is not in it somehow?

Gergely Jahn

unread,
Nov 18, 2025, 4:51:44 PM (14 days ago) Nov 18
to debezium
Hi Chris,

I've found the problem, story will be a bit long: 

Configuration for the test:
Additional information: I realized that setting schema.history.internal.store.only.captured.tables.ddl:false also fixes the problem.

The exception is raised in RelationalSnapshotChangeEventSource::createSchemaChangeEventsForTables
...
    final Table table = snapshotContext.tables.forTable(tableId);
    if (table == null) {
        throw new DebeziumException("Unable to find relational table model for '" + tableId +
         "', there may be an issue with your include/exclude list configuration.");
    }
...

So the signaling table is not in 
snapshotContext.tables.
The only place where we fill the snapshotContext's tables variable is in OracleSnapshotChangeEventSource::readTableStructure:

  @Override
    protected void readTableStructure(ChangeEventSourceContext sourceContext,
                                      RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext,
                                      OracleOffsetContext offsetContext, SnapshottingTask snapshottingTask)
            throws SQLException, InterruptedException {
        Set<TableId> capturedSchemaTables;
        if (databaseSchema.storeOnlyCapturedTables()) {
            capturedSchemaTables = snapshotContext.capturedTables;
            LOGGER.info("Only captured tables schema should be captured, capturing: {}", capturedSchemaTables);   // [2025-11-18 21:12:40,790] INFO Only captured tables schema should be captured, capturing: [FREE.BASEADM.STTM_ACCOUNT_CLASS] (io.debezium.connector.oracle.OracleSnapshotChangeEventSource)
        }
        else {
            capturedSchemaTables = snapshotContext.capturedSchemaTables;
            LOGGER.info("All eligible tables schema should be captured, capturing: {}", capturedSchemaTables);
        }

        Set<String> schemas = capturedSchemaTables.stream().map(TableId::schema).collect(Collectors.toSet());  // Only BASEADM
        final Tables.TableFilter tableFilter = getTableFilter(snapshottingTask, snapshotContext);
        for (String schema : schemas) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while reading structure of schema " + schema);
            }
            jdbcConnection.readSchema(
                    snapshotContext.tables,
                    null,
                    schema,
                    tableFilter,
                    null,
                    false);

        }
    }

jdbcConnection.readSchema() will add the table if it matches the tableFilter:

TableId tableId = new TableId(catalogName, schemaName, tableName);
if (tableFilter == null || tableFilter.isIncluded(tableId)) {
tableIds.add(tableId);
attributesByTable.putAll(getAttributeDetails(tableId, tableType));
}

but only enumerates tables for the provided schemas. 

To collect the schemas if schema.history.internal.store.only.captured.tables.ddl:false then snapshotContext.capturedSchemaTables will be used if it is true then snapshotContext.capturedTables.
So only the schemas provided by the set of tables will be used.

Finally we need to understand how we fill these sets in RelationalSnapshotChangeEventSource::determineCapturedTables

private void determineCapturedTables(RelationalSnapshotContext<P, O> ctx, Set<Pattern> dataCollectionsToBeSnapshotted, SnapshottingTask snapshottingTask)
            throws Exception {

        Set<TableId> allTableIds = getAllTableIds(ctx);
        Set<TableId> snapshottedTableIds = determineDataCollectionsToBeSnapshotted(allTableIds, dataCollectionsToBeSnapshotted).collect(Collectors.toSet()); <--- intersection of all tables in the database and snapshot.include.collection.list

        Set<TableId> capturedTables = new HashSet<>();
        Set<TableId> capturedSchemaTables = new HashSet<>();

        for (TableId tableId : allTableIds) {
            if (connectorConfig.getTableFilters().eligibleForSchemaDataCollectionFilter().isIncluded(tableId) && !snapshottingTask.isOnDemand()) {
                LOGGER.info("Adding table {} to the list of capture schema tables", tableId);  // [2025-11-18 21:12:40,617] INFO Adding table FREE.BASEADM.STTM_ACCOUNT_CLASS to the list of capture schema tables (io.debezium.relational.RelationalSnapshotChangeEventSource)

// [2025-11-18 21:12:40,617] INFO Adding table FREE.BASEADM_1.DEBEZIUM_SIGNAL to the list of capture schema tables (io.debezium.relational.RelationalSnapshotChangeEventSource)
                capturedSchemaTables.add(tableId); 
            }
        }

        for (TableId tableId : snapshottedTableIds) {
            if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
                LOGGER.trace("Adding table {} to the list of captured tables for which the data will be snapshotted", tableId); // [2025-11-18 21:12:40,617] TRACE Adding table FREE.BASEADM.STTM_ACCOUNT_CLASS to the list of captured tables for which the data will be snapshotted (io.debezium.relational.RelationalSnapshotChangeEventSource)
                capturedTables.add(tableId);
            }
            else {
                LOGGER.trace("Ignoring table {} for data snapshotting as it's not included in the filter configuration", tableId); <-- based on the logs we ignore the signaling table here if it is not listed in snapshot.include.collection.list
            }
        }

        ctx.capturedTables = addSignalingCollectionAndSort(capturedTables); <-- here signaling collection should be added
        ctx.capturedSchemaTables = snapshottingTask.isOnDemand() ? ctx.capturedTables
                : capturedSchemaTables
                        .stream()
                        .sorted()
                        .collect(Collectors.toCollection(LinkedHashSet::new));
    }


private Set<TableId> addSignalingCollectionAndSort(Set<TableId> capturedTables) {

        String tableIncludeList = connectorConfig.tableIncludeList();     <--- BASEADM.STTM_ACCOUNT_CLASS
        String signalingDataCollection = connectorConfig.getSignalingDataCollectionId();

        List<Pattern> captureTablePatterns = new ArrayList<>();
        if (!Strings.isNullOrBlank(tableIncludeList)) {
            captureTablePatterns.addAll(Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE));
        }
        else {
            captureTablePatterns.add(MATCH_ALL_PATTERN);  
        }

        if (!Strings.isNullOrBlank(signalingDataCollection)) {
            captureTablePatterns.addAll(getSignalDataCollectionPattern(signalingDataCollection));  <--- FREE.BASEADM_1.DEBEZIUM_SIGNAL
        }

        return captureTablePatterns
                .stream()
                .flatMap(pattern -> toTableIds(capturedTables, pattern))
                .collect(Collectors.toCollection(LinkedHashSet::new));
    }

Basically at the end we iterate on the collected patterns: [BASEADM.STTM_ACCOUNT_CLASS, FREE.BASEADM_1.DEBEZIUM_SIGNAL]

private Stream<TableId> toTableIds(Set<TableId> tableIds, Pattern pattern) {
        return tableIds
                .stream()
                .filter(tid -> pattern.asMatchPredicate().test(connectorConfig.getTableIdMapper().toString(tid)))
                .sorted();
    }

And for every pattern we select all previously collected tables matching it and merge them in a final set.
We don't extend capturedTables with the signaling table we only guarantee that capturedTables can have tables allowed in the table.include.list or provided as signal.data.collection.

Final trick:
RelationalSnapshotChangeEventSource::createSchemaChangeEventsForTables

for (Iterator<TableId> iterator = getTablesForSchemaChange(snapshotContext).iterator(); iterator.hasNext();) {
            final TableId tableId = iterator.next();
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while capturing schema of table " + tableId);
            }

            LOGGER.info("Capturing structure of table {}", tableId);

            snapshotContext.offset.event(tableId, getClock().currentTime());

            // If data are not snapshotted then the last schema change must set last snapshot flag
            if (!snapshottingTask.snapshotData() && !iterator.hasNext()) {
                lastSnapshotRecord(snapshotContext);
            }

            final Table table = snapshotContext.tables.forTable(tableId);
            if (table == null) {
                throw new DebeziumException("Unable to find relational table model for '" + tableId +
                        "', there may be an issue with your include/exclude list configuration.");
            }

getTablesForSchemaChange(snapshotContext) is overridden in OracleSnapshotChangeEventSource:

@Override
    protected Collection<TableId> getTablesForSchemaChange(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext) {
        return snapshotContext.capturedSchemaTables; <--- this set has the signaling table
    }

In RelationalTableFilters constructor:

String signalDataCollection = config.getString(RelationalDatabaseConnectorConfig.SIGNAL_DATA_COLLECTION);
        if (signalDataCollection != null) {
            TableId signalDataCollectionTableId = TableId.parse(signalDataCollection, useCatalogBeforeSchema);
            if (!finalTablePredicate.test(signalDataCollectionTableId)) {
                final Predicate<TableId> signalDataCollectionPredicate = Selectors.tableSelector()
                        .includeTables(tableIdMapper.toString(signalDataCollectionTableId), tableIdMapper).build();
                finalTablePredicate = finalTablePredicate.or(signalDataCollectionPredicate);
            }
        }
        this.tableFilter = finalTablePredicate::test;

...

 if (config.getBoolean(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL)) {
            this.schemaSnapshotFilter = eligibleSchemaPredicate.and(tableFilter::isIncluded)::test;
        }

In RelationalSnapshotChangeEventSource::determineCapturedTables:

if (connectorConfig.getTableFilters().eligibleForSchemaDataCollectionFilter().isIncluded(tableId) && !snapshottingTask.isOnDemand()) {
                LOGGER.info("Adding table {} to the list of capture schema tables", tableId);  // [2025-11-18 21:12:40,617] INFO Adding table FREE.BASEADM.STTM_ACCOUNT_CLASS to the list of capture schema tables (io.debezium.relational.RelationalSnapshotChangeEventSource)

// [2025-11-18 21:12:40,617] INFO Adding table FREE.BASEADM_1.DEBEZIUM_SIGNAL to the list of capture schema tables (io.debezium.relational.RelationalSnapshotChangeEventSource)
                capturedSchemaTables.add(tableId); 
            }

eligibleForSchemaDataCollectionFilter() returns schemaSnapshotFilter:

 public TableFilter eligibleForSchemaDataCollectionFilter() {
        return schemaSnapshotFilter;
    }


TLDR: We iterate on snapshotContext.capturedSchemaTables including signal.data.collection but we haven't loaded it from database as for that we iterated on snapshotContext.capturedTables.

Best,
Greg

Chris Cranford

unread,
Nov 21, 2025, 2:38:38 PM (11 days ago) Nov 21
to debe...@googlegroups.com
Hi -

I would say that's a bug Greg, as you should be able to use schema.history.internal.store.only.captured.tables.ddl=true with a signal table in a separate schema.  Could you raise a Jira issue for this, if you haven't already?

Thanks,
-cc

Gergely Jahn

unread,
Nov 30, 2025, 6:34:47 PM (2 days ago) Nov 30
to debezium
Hi Chris,

Sorry for the late reply, here is the jira ticket: https://issues.redhat.com/browse/DBZ-9717
Best,
Greg

Reply all
Reply to author
Forward
0 new messages