2021-06-04 05:54:16,062 INFO [io.deb.con.pos.con.WalPositionLocator] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) First LSN 'LSN{0/1B5BEBA0}' received
2021-06-04 05:54:16,062 INFO [io.deb.con.pos.PostgresStreamingChangeEventSource] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) WAL resume position 'LSN{0/1B5BEBA0}' discovered
2021-06-04 05:54:16,064 INFO [io.deb.jdb.JdbcConnection] (pool-11-thread-1) Connection gracefully closed
2021-06-04 05:54:16,119 INFO [io.deb.con.pos.con.PostgresReplicationConnection] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) Initializing PgOutput logical decoder publication
2021-06-04 05:54:16,140 INFO [io.deb.con.pos.PostgresStreamingChangeEventSource] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) Processing messages
2021-06-04 05:54:16,648 INFO [io.deb.con.pos.con.WalPositionLocator] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) Message with LSN 'LSN{0/1B5BEBA0}' arrived, switching off the filtering
2021-06-04 05:54:16,731 INFO [io.deb.jdb.JdbcConnection] (pool-13-thread-1) Connection gracefully closed
@ApplicationScoped
public class DebeziumListener {
private String dbHost="192.168.1.15";
private int dbPort=5432;
private String dbPassword="quarkus_test1";
private String dbName="customer";
private String dbUser="quarkus_test1";
private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumListener( ) {
System.out.println("Connecting for replicating data");
io.debezium.config.Configuration config=
io.debezium.config.Configuration.create()
.with("name", "customer-mysql-connector2")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/tmp/offsets2.dat")
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", dbHost)
.with("database.port", dbPort)
.with("database.user", dbUser)
.with("database.password", dbPassword)
.with("database.dbname", dbName)
.with("database.include.list", dbName)
.with("include.schema.changes", "false")
.with("database.server.id", "10181")
.with("database.server.name", "customer-mysql-db-server2")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", "/tmp/dbhistory2.dat")
.with("plugin.name", "pgoutput")
.build();
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(config.asProperties())
.notifying(this::handleEvent)
.build();
}
void onStart(@Observes StartupEvent ev) {
System.out.println("The application is starting...");
this.executor.execute(debeziumEngine);
}
private void handleEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
sourceRecord.value();
System.out.println("Replicating data....");
}
private final Executor executor = Executors.newSingleThreadExecutor();
@PostConstruct
private void start() {
this.executor.execute(debeziumEngine);
}
@PreDestroy
private void stop() throws IOException {
if (this.debeziumEngine != null) {
this.debeziumEngine.close();
}
}
}
--
You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/FkoC7D-9X5Q/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/b77c7ed1-302e-41ce-8d30-2d4b99c4cc25n%40googlegroups.com.