postgresql source connector

32 views
Skip to first unread message

Adrian Dr

unread,
Jan 27, 2020, 10:09:58 AM1/27/20
to Confluent Platform
Hello everyone,

I have the following docker-compose that works with my app writing to kafka then to elasticsearch:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888

  kafka:
    image: wurstmeister/kafka:0.11.0.1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: "42"
      KAFKA_ADVERTISED_HOST_NAME: "kafka"

  elasticsearch:
    ports:
      - "9200:9200"
      - "9300:9300"      
    environment:
      - xpack.security.enabled=false
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - esdata1:/usr/share/elasticsearch/data:rw
    command: elasticsearch -Etransport.host=127.0.0.1
        
  connect:
    image: confluentinc/cp-kafka-connect:3.3.0
    ports:
      - 8083:8083
    depends_on:
      - zookeeper
      - kafka
    volumes:
      - $PWD/connect-plugins:/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect"
      CONNECT_CONFIG_STORAGE_TOPIC: connect-config
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
      CONNECT_PLUGIN_PATH: /connect-plugins,C:\Utils\debezium-connector-postgres
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-3.3.0.jar
      
  kibana:
    ports:
      - 5601:5601
    depends_on:
      - elasticsearch
    environment:
      - xpack.security.enabled=false
      - ELASTICSEARCH_URL=http://elasticsearch:9200
    restart: unless-stopped
      
volumes:
  esdata1:
    driver: local

I am trying now to use debezium postgres source connector to read changes from a table from my database,used this article from medium and works https://medium.com/@tilakpatidar/streaming-data-from-postgresql-to-kafka-using-debezium-a14a2644906d but with postgres/zookeeper/kafka/diferent connect from docker.

I try to post this connector with postman to http://localhost:8083/connectors :

{
  "name": "inventory-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "127.0.0.1", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "pass", 
    "database.dbname" : "postgres", 
    "database.server.name": "postgres", 
    "table.whitelist": "sometable" 

  }
}

I get this error:

"error_code"500,
    "message""Failed to find any class that implements Connector and which name matches io.debezium.connector.postgresql.PostgresConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.hdfs.HdfsSinkConnector, name='io.confluent.connect.hdfs.HdfsSinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.hdfs.tools.SchemaSourceConnector, name='io.confluent.connect.hdfs.tools.SchemaSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='3.3.0', encodedVersion=3.3.0, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class io.confluent.connect.s3.S3SinkConnector, name='io.confluent.connect.s3.S3SinkConnector', version='3.3.0', encodedVersion=3.3.0, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.storage.tools.SchemaSourceConnector, name='io.confluent.connect.storage.tools.SchemaSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='0.11.0.0-cp1', encodedVersion=0.11.0.0-cp1, type=source, typeName='source', location='classpath'}"
} Any suggestions? I cannot use that model from medium article because I use postgres locally and I have another docker-compose with kafka/zoo/connect.

Adrian Dr

unread,
Jan 27, 2020, 10:24:00 AM1/27/20
to Confluent Platform
I tried with local postgres and zookeeper/kafka/connect from that article and I get error 500 when I try to post the config

My pg_hba.conf looks like this:

# TYPE  DATABASE        USER            ADDRESS                 METHOD

# IPv4 local connections:
host    all             all             127.0.0.1/32            md5
# IPv6 local connections:
host    all             all             ::1/128                 md5
# Allow replication connections from localhost, by a user with the
# replication privilege.
host    replication     all             127.0.0.1/32            md5
host    replication     all             ::1/128                 md5
local   replication     postgres                          trust   
host    replication     postgres  127.0.0.1/32            trust   
host    replication     postgres  ::1/128                 trust


I made these changes to postgresql.conf
shared_preload_libraries = 'decoderbufs,wal2json' 
wal_level = logical
max_wal_senders = 1              
max_replication_slots = 1

Robin Moffatt

unread,
Jan 27, 2020, 1:02:33 PM1/27/20
to confluent...@googlegroups.com
Here's your problem: 

Failed to find any class that implements Connector and which name matches io.debezium.connector.postgresql.PostgresConnector,

So the Kafka Connect worker that you're running doesn't have the Debezium Postgres connector installed. One option is to just install it at runtime, as shown here, as part of the container's start up command. 

You can also just use the Debezium Kafka Connect image at https://hub.docker.com/r/debezium/connect

Once you've got your worker running, verify that the connector plugin is available: 

curl -s localhost:8083/connector-plugins|jq '.[].class'





-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/1309f422-cb07-4570-9997-d982720980c1%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages