Hi Chris,
I've found the problem, story will be a bit long:
Configuration for the test:
Additional information: I realized that setting schema.history.internal.store.only.captured.tables.ddl:false also fixes the problem.
The exception is raised in
RelationalSnapshotChangeEventSource::createSchemaChangeEventsForTables
...
final Table table = snapshotContext.tables.forTable(tableId);
if (table == null) {
throw new DebeziumException("Unable to find relational table model for '" + tableId +
"', there may be an issue with your include/exclude list configuration.");
}
...
So the signaling table is not in snapshotContext.tables.
The only place where we fill the snapshotContext's tables variable is in OracleSnapshotChangeEventSource::readTableStructure:
@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext,
OracleOffsetContext offsetContext, SnapshottingTask snapshottingTask)
throws SQLException, InterruptedException {
Set<TableId> capturedSchemaTables;
if (databaseSchema.storeOnlyCapturedTables()) {
capturedSchemaTables = snapshotContext.capturedTables;
LOGGER.info("Only captured tables schema should be captured, capturing: {}", capturedSchemaTables); // [2025-11-18 21:12:40,790] INFO Only captured tables schema should be captured, capturing: [FREE.BASEADM.STTM_ACCOUNT_CLASS] (io.debezium.connector.oracle.OracleSnapshotChangeEventSource)
}
else {
capturedSchemaTables = snapshotContext.capturedSchemaTables;
LOGGER.info("All eligible tables schema should be captured, capturing: {}", capturedSchemaTables);
}
Set<String> schemas = capturedSchemaTables.stream().map(TableId::schema).collect(Collectors.toSet()); // Only BASEADM
final Tables.TableFilter tableFilter = getTableFilter(snapshottingTask, snapshotContext);
for (String schema : schemas) {
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while reading structure of schema " + schema);
}
jdbcConnection.readSchema(
snapshotContext.tables,
null,
schema,
tableFilter,
null,
false);
}
}
jdbcConnection.readSchema() will add the table if it matches the tableFilter:
TableId tableId = new TableId(catalogName, schemaName, tableName);
if (tableFilter == null || tableFilter.isIncluded(tableId)) {
tableIds.add(tableId);
attributesByTable.putAll(getAttributeDetails(tableId, tableType));
}
but only enumerates tables for the provided schemas.
To collect the schemas if schema.history.internal.store.only.captured.tables.ddl:false then snapshotContext.capturedSchemaTables will be used if it is true then snapshotContext.capturedTables.
So only the schemas provided by the set of tables will be used.
Finally we need to understand how we fill these sets in RelationalSnapshotChangeEventSource::determineCapturedTables
private void determineCapturedTables(RelationalSnapshotContext<P, O> ctx, Set<Pattern> dataCollectionsToBeSnapshotted, SnapshottingTask snapshottingTask)
throws Exception {
Set<TableId> allTableIds = getAllTableIds(ctx);
Set<TableId> snapshottedTableIds = determineDataCollectionsToBeSnapshotted(allTableIds, dataCollectionsToBeSnapshotted).collect(Collectors.toSet());
<--- intersection of all tables in the database and snapshot.include.collection.list Set<TableId> capturedTables = new HashSet<>();
Set<TableId> capturedSchemaTables = new HashSet<>();
for (TableId tableId : allTableIds) {
if (connectorConfig.getTableFilters().eligibleForSchemaDataCollectionFilter().isIncluded(tableId) && !snapshottingTask.isOnDemand()) {
LOGGER.info("Adding table {} to the list of capture schema tables", tableId); //
[2025-11-18 21:12:40,617] INFO Adding table FREE.BASEADM.STTM_ACCOUNT_CLASS to the list of capture schema tables (io.debezium.relational.RelationalSnapshotChangeEventSource) // [2025-11-18 21:12:40,617] INFO Adding table FREE.BASEADM_1.DEBEZIUM_SIGNAL to the list of capture schema tables (io.debezium.relational.RelationalSnapshotChangeEventSource)
capturedSchemaTables.add(tableId);
}
}
for (TableId tableId : snapshottedTableIds) {
if (connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
LOGGER.trace("Adding table {} to the list of captured tables for which the data will be snapshotted", tableId); //
[2025-11-18 21:12:40,617] TRACE Adding table FREE.BASEADM.STTM_ACCOUNT_CLASS to the list of captured tables for which the data will be snapshotted (io.debezium.relational.RelationalSnapshotChangeEventSource) capturedTables.add(tableId);
}
else {
LOGGER.trace("Ignoring table {} for data snapshotting as it's not included in the filter configuration", tableId);
<-- based on the logs we ignore the signaling table here if it is not listed in snapshot.include.collection.list }
}
ctx.capturedTables = addSignalingCollectionAndSort(capturedTables);
<-- here signaling collection should be added ctx.capturedSchemaTables = snapshottingTask.isOnDemand() ? ctx.capturedTables
: capturedSchemaTables
.stream()
.sorted()
.collect(Collectors.toCollection(LinkedHashSet::new));
}
private Set<TableId> addSignalingCollectionAndSort(Set<TableId> capturedTables) {
String tableIncludeList = connectorConfig.tableIncludeList();
<--- BASEADM.STTM_ACCOUNT_CLASS String signalingDataCollection = connectorConfig.getSignalingDataCollectionId();
List<Pattern> captureTablePatterns = new ArrayList<>();
if (!Strings.isNullOrBlank(tableIncludeList)) {
captureTablePatterns.addAll(Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE));
}
else {
captureTablePatterns.add(MATCH_ALL_PATTERN);
}
if (!Strings.isNullOrBlank(signalingDataCollection)) {
captureTablePatterns.addAll(getSignalDataCollectionPattern(signalingDataCollection));
<--- FREE.BASEADM_1.DEBEZIUM_SIGNAL }
return captureTablePatterns
.stream()
.flatMap(pattern -> toTableIds(capturedTables, pattern))
.collect(Collectors.toCollection(LinkedHashSet::new));
}
Basically at the end we iterate on the collected patterns: [BASEADM.STTM_ACCOUNT_CLASS, FREE.BASEADM_1.DEBEZIUM_SIGNAL]
private Stream<TableId> toTableIds(Set<TableId> tableIds, Pattern pattern) {
return tableIds
.stream()
.filter(tid -> pattern.asMatchPredicate().test(connectorConfig.getTableIdMapper().toString(tid)))
.sorted();
}
And for every pattern we select all previously collected tables matching it and merge them in a final set.
We don't extend capturedTables with the signaling table we only guarantee that capturedTables can have tables allowed in the table.include.list or provided as signal.data.collection.
Final trick:
RelationalSnapshotChangeEventSource::createSchemaChangeEventsForTables
for (Iterator<TableId> iterator =
getTablesForSchemaChange(snapshotContext).iterator(); iterator.hasNext();) {
final TableId tableId = iterator.next();
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while capturing schema of table " + tableId);
}
LOGGER.info("Capturing structure of table {}", tableId);
snapshotContext.offset.event(tableId, getClock().currentTime());
// If data are not snapshotted then the last schema change must set last snapshot flag
if (!snapshottingTask.snapshotData() && !iterator.hasNext()) {
lastSnapshotRecord(snapshotContext);
}
final Table table = snapshotContext.tables.forTable(tableId);
if (table == null) {
throw new DebeziumException("Unable to find relational table model for '" + tableId +
"', there may be an issue with your include/exclude list configuration.");
}
getTablesForSchemaChange(snapshotContext) is overridden in OracleSnapshotChangeEventSource:
@Override
protected Collection<TableId> getTablesForSchemaChange(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext) {
return snapshotContext.capturedSchemaTables;
<--- this set has the signaling table }
In RelationalTableFilters constructor:
String signalDataCollection = config.getString(RelationalDatabaseConnectorConfig.SIGNAL_DATA_COLLECTION);
if (signalDataCollection != null) {
TableId signalDataCollectionTableId = TableId.parse(signalDataCollection, useCatalogBeforeSchema);
if (!finalTablePredicate.test(signalDataCollectionTableId)) {
final Predicate<TableId> signalDataCollectionPredicate = Selectors.tableSelector()
.includeTables(tableIdMapper.toString(signalDataCollectionTableId), tableIdMapper).build();
finalTablePredicate = finalTablePredicate.or(signalDataCollectionPredicate);
}
}
this.tableFilter = finalTablePredicate::test;
...
if (config.getBoolean(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL)) {
this.schemaSnapshotFilter = eligibleSchemaPredicate.and(tableFilter::isIncluded)::test;
}
In RelationalSnapshotChangeEventSource::determineCapturedTables:
if (connectorConfig.getTableFilters().eligibleForSchemaDataCollectionFilter().isIncluded(tableId) && !snapshottingTask.isOnDemand()) {
LOGGER.info("Adding table {} to the list of capture schema tables", tableId); //
[2025-11-18 21:12:40,617] INFO Adding table FREE.BASEADM.STTM_ACCOUNT_CLASS to the list of capture schema tables (io.debezium.relational.RelationalSnapshotChangeEventSource) // [2025-11-18 21:12:40,617] INFO Adding table FREE.BASEADM_1.DEBEZIUM_SIGNAL to the list of capture schema tables (io.debezium.relational.RelationalSnapshotChangeEventSource)
capturedSchemaTables.add(tableId);
}
eligibleForSchemaDataCollectionFilter() returns schemaSnapshotFilter:
public TableFilter eligibleForSchemaDataCollectionFilter() {
return schemaSnapshotFilter;
}
TLDR: We iterate on snapshotContext.capturedSchemaTables including signal.data.collection but we haven't loaded it from database as for that we iterated on snapshotContext.capturedTables.Best,
Greg