@App:description("Description of the plan")
@source(type = 'cdc' , url = 'jdbc:postgresql://localhost:5432/dev?currentSchema=sharedataview',
username = 'dev', password = '
dev123', mode = 'listening',
table.name = 'sharedataview.cdc_table1', operation = 'delete',
@map(type = 'keyvalue', fail.on.missing.attribute = 'false'))
define stream sourceStream1 (before_id string);
@source(type = 'cdc' , url = 'jdbc:postgresql:/localhost:5432/dev?currentSchema=sharedataview',
username = '
dev', password = '
dev123', mode = 'listening',
table.name = 'sharedataview.cdc_table2', operation = 'delete',
@map(type = 'keyvalue', fail.on.missing.attribute = 'false'))
define stream sourceStream2 (before_id string);
@sink(type = 'log')
define stream LogInsertStream1 (before_id string);
@sink(type = 'log')
define stream LogInsertStream2 (before_id string);
@info(name = 'insert log1')
from sourceStream1
select before_id
insert into LogInsertStream1;
@info(name = 'insert log2')
from sourceStream2
select before_id
insert into LogInsertStream2;
ERROR {io.siddhi.core.stream.input.source.Source} - Error on 'cdc_test_delete'. Connection to the database lost. Error while connecting at Source 'cdc' at 'sourceStream1'. Will retry in '5 sec'. io.siddhi.core.exception.ConnectionUnavailableException: Connection to the database lost.
at io.siddhi.extension.io.cdc.source.CDCSource.lambda$connect$1(CDCSource.java:650)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:882)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.debezium.DebeziumException: Creation of replication slot failed; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:134)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758)
... 3 more
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" already exists
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2510)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2245)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:311)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:447)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:368)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:309)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:295)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:272)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:267)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:352)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:127)
... 5 more