Postgresql JDBC Source Connector timestamp mode repeats last message infinitely

242 views
Skip to first unread message

Mark Vander Lugt

unread,
Dec 27, 2016, 6:58:40 PM12/27/16
to Confluent Platform
I setup a simple example with a single database table, single standalone connect, and single kafka instance. I monitored the kafka topic using the console consumer and when I turned on the standalone connector it repeats the message with the newest timestamp over and over. Any help as to why the connector behaves this way be greatly appreciated. 

Thanks,

Mark Vander Lugt



Box A
Ubuntu 16.10
Zookeeper/Kafka Installed manually from http://kafka.apache.org/
./bin/kafka-console-consumer.sh --topic test-kc-users --bootstrap-server localhost:9092

Box B
Ubuntu 16.10
Confluent installed via apt (confluent-kafka-2.11.7 & confluent-kafka-connect-jdbc)
connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-jdbc/postgresql-sample.properties


Database Table
--------------
DROP TABLE IF EXISTS public.users;

CREATE TABLE public.users (
   u_id            char(36) primary key,
   u_created       timestamp not null default current_timestamp,
   u_modified      timestamp not null default current_timestamp,
   u_deactivated   timestamp null,
   u_name          varchar(100) not null unique,
   u_description   varchar(500) null,
   replication_id  serial
);

CREATE OR REPLACE FUNCTION public.users_update_modified()
  RETURNS TRIGGER AS $$
BEGIN
  NEW.u_MODIFIED = now();
  RETURN NEW;
END;
$$ LANGUAGE 'plpgsql';

CREATE TRIGGER users_before_update BEFORE
UPDATE ON public.users
FOR EACH ROW EXECUTE PROCEDURE public.users_update_modified();



/etc/kafka-connect-jdbc/postgresql-sample.properties
----------------------------------------------------
name=test-postgresql-example
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:postgresql://**********:5432/kafka-test?user=********&password=********
mode=timestamp+incrementing
#mode=incrementing
topic.prefix=test-kc-
table.whitelist=users




/etc/kafka/connect-standalone.properties
----------------------------------------
bootstrap.servers=*********:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging



Reply all
Reply to author
Forward
0 new messages