Getting started with Embedded , Quarkus and Postgres

235 views
Skip to first unread message

Brand Bintley

unread,
Jun 4, 2021, 6:16:42 AM6/4/21
to debezium
First time Debezium user. I am trying to watch a customer table by embedding Debezium, everything implemented  with Quarkus but following this tutorial: https://www.baeldung.com/debezium-intro

My code is below. What I am seeing is that when I write a new customer to the postgres DB that I am watching, I get the following message on the console. But my handler is not invoked. Also, this message happens only for the first insert. Any other insert results in no log message and no handler invocation. What am I missing? 

2021-06-04 05:54:16,062 INFO  [io.deb.con.pos.con.WalPositionLocator] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) First LSN 'LSN{0/1B5BEBA0}' received

2021-06-04 05:54:16,062 INFO  [io.deb.con.pos.PostgresStreamingChangeEventSource] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) WAL resume position 'LSN{0/1B5BEBA0}' discovered

2021-06-04 05:54:16,064 INFO  [io.deb.jdb.JdbcConnection] (pool-11-thread-1) Connection gracefully closed

2021-06-04 05:54:16,119 INFO  [io.deb.con.pos.con.PostgresReplicationConnection] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) Initializing PgOutput logical decoder publication

2021-06-04 05:54:16,140 INFO  [io.deb.con.pos.PostgresStreamingChangeEventSource] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) Processing messages

2021-06-04 05:54:16,648 INFO  [io.deb.con.pos.con.WalPositionLocator] (debezium-postgresconnector-customer-mysql-db-server2-change-event-source-coordinator) Message with LSN 'LSN{0/1B5BEBA0}' arrived, switching off the filtering

2021-06-04 05:54:16,731 INFO  [io.deb.jdb.JdbcConnection] (pool-13-thread-1) Connection gracefully closed




@ApplicationScoped

public class DebeziumListener {


private String dbHost="192.168.1.15";

private int dbPort=5432;

private String dbPassword="quarkus_test1";

private String dbName="customer";

private String dbUser="quarkus_test1";

private DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

public DebeziumListener( ) {

        System.out.println("Connecting for replicating data");


io.debezium.config.Configuration config=

io.debezium.config.Configuration.create()

        .with("name", "customer-mysql-connector2")

        .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")

        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")

        .with("offset.storage.file.filename", "/tmp/offsets2.dat")

        .with("offset.flush.interval.ms", "60000")

        .with("database.hostname", dbHost)

        .with("database.port", dbPort)

        .with("database.user", dbUser)

        .with("database.password", dbPassword)

        .with("database.dbname", dbName)

        .with("database.include.list", dbName)

        .with("include.schema.changes", "false")

        .with("database.server.id", "10181")

        .with("database.server.name", "customer-mysql-db-server2")

        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")

        .with("database.history.file.filename", "/tmp/dbhistory2.dat")

        .with("plugin.name", "pgoutput")

        .build();

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))

      .using(config.asProperties())

      .notifying(this::handleEvent)

      .build();


}


    void onStart(@Observes StartupEvent ev) {               

     System.out.println("The application is starting...");

      this.executor.execute(debeziumEngine);


    }

private void handleEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {

SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();

sourceRecord.value();

        System.out.println("Replicating data....");

}

private final Executor executor = Executors.newSingleThreadExecutor();


@PostConstruct

private void start() {

    this.executor.execute(debeziumEngine);

}


@PreDestroy

private void stop() throws IOException {

    if (this.debeziumEngine != null) {

        this.debeziumEngine.close();

    }

}


}

Brand Bintley

unread,
Jun 4, 2021, 4:05:31 PM6/4/21
to debe...@googlegroups.com
Please help. How do I get embedded Debezium to work with Postgres?

--
You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/FkoC7D-9X5Q/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/b77c7ed1-302e-41ce-8d30-2d4b99c4cc25n%40googlegroups.com.

jiri.p...@gmail.com

unread,
Jun 7, 2021, 3:41:35 AM6/7/21
to debezium
Hi,

could you please share the full log?

J.

Reply all
Reply to author
Forward
0 new messages