./bin/kafka-console-consumer.sh --topic test-kc-users --bootstrap-server localhost:9092connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-jdbc/postgresql-sample.properties
DROP TABLE IF EXISTS public.users;
CREATE TABLE public.users ( u_id char(36) primary key, u_created timestamp not null default current_timestamp, u_modified timestamp not null default current_timestamp, u_deactivated timestamp null, u_name varchar(100) not null unique, u_description varchar(500) null, replication_id serial);
CREATE OR REPLACE FUNCTION public.users_update_modified() RETURNS TRIGGER AS $$BEGIN NEW.u_MODIFIED = now(); RETURN NEW;END;$$ LANGUAGE 'plpgsql';
CREATE TRIGGER users_before_update BEFOREUPDATE ON public.usersFOR EACH ROW EXECUTE PROCEDURE public.users_update_modified();name=test-postgresql-exampleconnector.class=io.confluent.connect.jdbc.JdbcSourceConnectortasks.max=1# The remaining configs are specific to the JDBC connector. In this example, we connect to a# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.connection.url=jdbc:postgresql://**********:5432/kafka-test?user=********&password=********mode=timestamp+incrementing#mode=incrementingtimestamp.column.name=u_modifiedincrementing.column.name=replication_idtopic.prefix=test-kc-table.whitelist=usersbootstrap.servers=*********:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will# need to configure these based on the format they want their data in when loaded from or stored into Kafkakey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply# it tokey.converter.schemas.enable=truevalue.converter.schemas.enable=true
# The internal converter used for offsets and config data is configurable and must be specified, but most users will# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.internal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets# Flush much faster than normal, which is useful for testing/debuggingoffset.flush.interval.ms=10000