postgresql connector error.

2302 views
Skip to first unread message

Naci Ishakbeyoglu

unread,
Mar 30, 2017, 8:10:23 PM3/30/17
to debezium
Hi,

I am trying to make debezium work on my mac os with postgresql in standalone mode.
I start zookeeper and kafka server. I have this as postgresql connector properties:

name=events-debezium
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=localhost
database.port=5432
database.user=naci
database.password=
database.dbname=testdb

Then run connector and get the following error. "naci" is superuser in postgresql and I can get into postgresql shell with "psql -U naci -h localhost <db_name>"

➜  kafka_2.11-0.10.2.0 bin/connect-standalone.sh config/connect-standalone.properties config/connect-debezium-producer.properties

[2017-03-30 16:57:01,316] INFO StandaloneConfig values: 

access.control.allow.methods = 

access.control.allow.origin = 

bootstrap.servers = [localhost:9092]

internal.key.converter = class org.apache.kafka.connect.json.JsonConverter

internal.value.converter = class org.apache.kafka.connect.json.JsonConverter

key.converter = class org.apache.kafka.connect.json.JsonConverter

offset.flush.interval.ms = 10000

offset.flush.timeout.ms = 5000

offset.storage.file.filename = /tmp/connect.offsets

rest.advertised.host.name = null

rest.advertised.port = null

rest.host.name = null

rest.port = 8083

task.shutdown.graceful.timeout.ms = 5000

value.converter = class org.apache.kafka.connect.json.JsonConverter

 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:180)

[2017-03-30 16:57:01,456] INFO Logging initialized @576ms (org.eclipse.jetty.util.log:186)

[2017-03-30 16:57:01,669] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:50)

[2017-03-30 16:57:01,669] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)

[2017-03-30 16:57:01,670] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:119)

[2017-03-30 16:57:01,670] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)

[2017-03-30 16:57:01,708] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)

[2017-03-30 16:57:01,709] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:74)

[2017-03-30 16:57:01,709] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)

[2017-03-30 16:57:01,790] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)

Mar 30, 2017 4:57:02 PM org.glassfish.jersey.internal.Errors logErrors

WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.

WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.

WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.

WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.


[2017-03-30 16:57:02,287] INFO Started o.e.j.s.ServletContextHandler@2a5b3fee{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)

[2017-03-30 16:57:02,327] INFO Started ServerConnector@7f79b310{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)

[2017-03-30 16:57:02,328] INFO Started @1451ms (org.eclipse.jetty.server.Server:379)

[2017-03-30 16:57:02,328] INFO REST server listening at http://192.168.128.52:8083/, advertising URL http://192.168.128.52:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)

[2017-03-30 16:57:02,329] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56)

Mar 30, 2017 4:57:02 PM org.postgresql.Driver connect

SEVERE: Error in url: jdbc:postgresql://${hostname}:${port}/${dbname}

Mar 30, 2017 4:57:02 PM org.postgresql.Driver connect

SEVERE: Error in url: jdbc:postgresql://${hostname}:${port}/${dbname}

[2017-03-30 16:57:02,395] ERROR Failed to create job for config/connect-debezium-producer.properties (org.apache.kafka.connect.cli.ConnectStandalone:88)

[2017-03-30 16:57:02,397] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99)

java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid (use the endpoint `/{connectorType}/config/validate` to get a full list of errors)

at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)

at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)

at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:96)

Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid (use the endpoint `/{connectorType}/config/validate` to get a full list of errors)

at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:161)

at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)

[2017-03-30 16:57:02,398] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:66)

[2017-03-30 16:57:02,398] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)

[2017-03-30 16:57:02,405] INFO Stopped ServerConnector@7f79b310{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)

[2017-03-30 16:57:02,410] INFO Stopped o.e.j.s.ServletContextHandler@2a5b3fee{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)

[2017-03-30 16:57:02,411] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:165)

[2017-03-30 16:57:02,411] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:78)

[2017-03-30 16:57:02,411] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:131)

[2017-03-30 16:57:02,411] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:68)

[2017-03-30 16:57:02,412] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:151)

[2017-03-30 16:57:03,168] INFO Reflections took 1425 ms to scan 67 urls, producing 3229 keys and 23572 values  (org.reflections.Reflections:229)

[2017-03-30 16:57:03,191] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:88)

[2017-03-30 16:57:03,191] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:71)

➜  kafka_2.11-0.10.2.0 

Randall Hauch

unread,
Mar 31, 2017, 12:28:32 PM3/31/17
to debe...@googlegroups.com, Naci Ishakbeyoglu
What version of PostgreSQL are you running? These error messages:

Mar 30, 2017 4:57:02 PM org.postgresql.Driver connect
SEVERE: Error in url: jdbc:postgresql://${hostname}:${port}/${dbname}
Mar 30, 2017 4:57:02 PM org.postgresql.Driver connect
SEVERE: Error in url: jdbc:postgresql://${hostname}:${port}/${dbname}

Seem to imply that the connector’s configuration properties are not getting to the connector, since the connector is not replacing the placeholders in the URL template.

What is the command you’re using to start Kafka Connect standalone? Make sure that you’re supplying the worker properties file first and then the connector properties; see http://docs.confluent.io/3.2.0/connect/userguide.html#standalone-mode 
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/639a6379-710d-4087-ab7d-48dd762bb655%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Naci Ishakbeyoglu

unread,
Mar 31, 2017, 3:07:57 PM3/31/17
to debezium
Hi,

I am using like this:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-debezium-producer.properties


where I have not modified connect-standalone.properties, and connect-debezium-producer.properties have:

name=vida_events_debezium

connector.class=io.debezium.connector.postgresql.PostgresConnector

database.hostname=127.0.0.1

database.port=5432

database.user=naci

database.password=

database.dbname=vida

database.server.name=vida_db_events

tasks.max=1


I also tried this using the containers as in debezium tutorial using the same zookeper, kafta, and connect containers:

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_DB=vida -e POSTGRES_USER=naci -e POSTGRES_PASSWORD=naci debezium/postgres


At this point in a separate term: "psql -U naci -h localhost vida" works, I create a table, etc, then try to load postgres connector:

➜  config curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "vida-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "naci", "database.password": "naci", "database.server.name": "dbserver1", "database.whitelist": "vida", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.vida" } }'

HTTP/1.1 400 Bad Request

Date: Fri, 31 Mar 2017 19:06:34 GMT

Content-Type: application/json

Content-Length: 148

Server: Jetty(9.2.15.v20160210)


{"error_code":400,"message":"Connector configuration is invalid (use the endpoint `/{connectorType}/config/validate` to get a full list of errors)"}%                                                                                                                                           ➜  config 



seeing the same on connect container terminal:

Mar 31, 2017 7:06:34 PM org.postgresql.Driver connect

SEVERE: Error in url: jdbc:postgresql://${hostname}:${port}/${dbname}

Mar 31, 2017 7:06:34 PM org.postgresql.Driver connect

SEVERE: Error in url: jdbc:postgresql://${hostname}:${port}/${dbname}

2017-03-31 19:06:34,071 INFO   ||  172.17.0.1 - - [31/Mar/2017:19:06:34 +0000] "POST /connectors/ HTTP/1.1" 400 148  15   [org.apache.kafka.connect.runtime.rest.RestServer]


Thanks
Naci

Naci Ishakbeyoglu

unread,
Mar 31, 2017, 3:11:22 PM3/31/17
to debezium
And forgot to write how I started connect container:

docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3  -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka debezium/connect


before I started debezium/postgres container.

Randall Hauch

unread,
Mar 31, 2017, 4:57:08 PM3/31/17
to debe...@googlegroups.com, Naci Ishakbeyoglu
Did you try validating the configuration as referenced in the logs? See http://docs.confluent.io/3.2.0/connect/restapi.html#put--connector-plugins-(string-name)-config-validate for info about how to do this.
--

You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.

Naci Ishakbeyoglu

unread,
Mar 31, 2017, 5:46:49 PM3/31/17
to debezium
Sorry for not noticing the doc. Not sure if I issue curl wrong:

➜  config curl -i -X GET -H "Accept:application/json" localhost:8083/connector-plugins/

HTTP/1.1 200 OK

Date: Fri, 31 Mar 2017 21:44:49 GMT

Content-Type: application/json

Content-Length: 312

Server: Jetty(9.2.15.v20160210)


[{"class":"io.debezium.connector.mongodb.MongoDbConnector"},{"class":"io.debezium.connector.mysql.MySqlConnector"},{"class":"io.debezium.connector.postgresql.PostgresConnector"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector"}]%             ➜  config 

➜  config curl -i -X PUT -H "Accept:application/json" localhost:8083/connector-plugins/PostgresConnector/config/validate -d '{ "name": "vida-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "naci", "database.password": "naci", "database.server.name": "dbserver1"} }'

HTTP/1.1 415 Unsupported Media Type

Date: Fri, 31 Mar 2017 21:44:59 GMT

Cache-Control: must-revalidate,no-cache,no-store

Content-Type: text/html; charset=ISO-8859-1

Content-Length: 342

Server: Jetty(9.2.15.v20160210)


<html>

<head>

<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>

<title>Error 415 </title>

</head>

<body>

<h2>HTTP ERROR: 415</h2>

<p>Problem accessing /connector-plugins/PostgresConnector/config/validate. Reason:

<pre>    Unsupported Media Type</pre></p>

<hr /><i><small>Powered by Jetty://</small></i>

</body>

</html>

➜  config 


On connect container:

2017-03-31 21:44:59,994 INFO   ||  172.17.0.1 - - [31/Mar/2017:21:44:59 +0000] "PUT /connector-plugins/PostgresConnector/config/validate HTTP/1.1" 415 342  3   [org.apache.kafka.connect.runtime.rest.RestServer]

Randall Hauch

unread,
Mar 31, 2017, 5:58:21 PM3/31/17
to debe...@googlegroups.com, Naci Ishakbeyoglu
That resource returns a JSON, whereas your curl request didn’t specify a MIME type for the response and thus defaults to HTML. **Add** the following to your curl request to specify JSON content type for the response:

-H "Content-Type:application/json"

Naci Ishakbeyoglu

unread,
Mar 31, 2017, 6:26:52 PM3/31/17
to debezium
Tried that as well:

➜  config curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connector-plugins/PostgresConnector/config/validate -d '{ "name": "vida-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "naci", "database.password": "naci", "database.server.name": "dbserver1"} }'

HTTP/1.1 500 Request failed.

Date: Fri, 31 Mar 2017 22:23:41 GMT

Cache-Control: must-revalidate,no-cache,no-store

Content-Type: text/html; charset=ISO-8859-1

Content-Length: 335

Server: Jetty(9.2.15.v20160210)


<html>

<head>

<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>

<title>Error 500 </title>

</head>

<body>

<h2>HTTP ERROR: 500</h2>

<p>Problem accessing /connector-plugins/PostgresConnector/config/validate. Reason:

<pre>    Request failed.</pre></p>

<hr /><i><small>Powered by Jetty://</small></i>

</body>

</html>

➜  config


On connect container, I get:

2017-03-31 22:25:39,580 INFO   ||  172.17.0.1 - - [31/Mar/2017:22:25:39 +0000] "PUT /connector-plugins/PostgresConnector/config/validate HTTP/1.1" 500 335  8   [org.apache.kafka.connect.runtime.rest.RestServer]

2017-03-31 22:25:39,580 WARN   ||  /connector-plugins/PostgresConnector/config/validate   [org.eclipse.jetty.server.HttpChannel]

javax.servlet.ServletException: javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token

 at [Source: org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream@215db9bf; line: 1, column: 39] (through reference chain: java.util.LinkedHashMap["config"])

at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:130)

at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)

at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)

at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)

at org.eclipse.jetty.server.Server.handle(Server.java:499)

at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)

at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257)

at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)

at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)

at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)

at java.lang.Thread.run(Thread.java:745)

Caused by: javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token

 at [Source: org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream@215db9bf; line: 1, column: 39] (through reference chain: java.util.LinkedHashMap["config"])

at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)

at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)

at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)

at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)

at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)

at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812)

at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587)

at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)

at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)

at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)

at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)

at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)

at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)

at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)

... 10 more

Caused by: org.glassfish.jersey.server.ContainerException: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token

 at [Source: org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream@215db9bf; line: 1, column: 39] (through reference chain: java.util.LinkedHashMap["config"])

at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:278)

at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:260)

at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:509)

at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:334)

at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)

at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)

at org.glassfish.jersey.internal.Errors.process(Errors.java:315)

at org.glassfish.jersey.internal.Errors.process(Errors.java:297)

at org.glassfish.jersey.internal.Errors.process(Errors.java:267)

at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)

at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:305)

at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1154)

at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:473)

... 23 more

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token

 at [Source: org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream@215db9bf; line: 1, column: 39] (through reference chain: java.util.LinkedHashMap["config"])

at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)

at com.fasterxml.jackson.databind.DeserializationContext.reportMappingException(DeserializationContext.java:1234)

at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1122)

at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1075)

at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:60)

at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)

at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:517)

at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:362)

at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:27)

at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1583)

at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:964)

at com.fasterxml.jackson.jaxrs.base.ProviderBase.readFrom(ProviderBase.java:815)

at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.invokeReadFrom(ReaderInterceptorExecutor.java:256)

at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.aroundReadFrom(ReaderInterceptorExecutor.java:235)

at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:155)

at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundReadFrom(MappableExceptionWrapperInterceptor.java:74)

at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:155)

at org.glassfish.jersey.message.internal.MessageBodyFactory.readFrom(MessageBodyFactory.java:1085)

at org.glassfish.jersey.message.internal.InboundMessageContext.readEntity(InboundMessageContext.java:874)

at org.glassfish.jersey.server.ContainerRequest.readEntity(ContainerRequest.java:271)

at org.glassfish.jersey.server.internal.inject.EntityParamValueFactoryProvider$EntityValueFactory.provide(EntityParamValueFactoryProvider.java:96)

at org.glassfish.jersey.server.spi.internal.ParamValueFactoryWithSource.provide(ParamValueFactoryWithSource.java:71)

at org.glassfish.jersey.server.spi.internal.ParameterValueHelper.getParameterValues(ParameterValueHelper.java:94)

at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$AbstractMethodParamInvoker.getParamValues(JavaResourceMethodDispatcherProvider.java:127)

at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:205)

at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)

at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)

at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)

at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)

at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:326)

... 32 more

2017-03-31 22:25:39,584 WARN   ||  Could not send response error 500: javax.servlet.ServletException: javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_OBJECT token

 at [Source: org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream@215db9bf; line: 1, column: 39] (through reference chain: java.util.LinkedHashMap["config"])   [org.eclipse.jetty.server.HttpChannel]

Naci Ishakbeyoglu

unread,
Apr 1, 2017, 11:33:30 AM4/1/17
to debezium
Hi Randall,

There is a problem with config validation even in MySqlConnector in tutorial. After I add inventory-connector, I try to run config/validate with the same json but have this:

➜  ~ curl -H "Accept:application/json" localhost:8083/connectors/

["inventory-connector"]%                                                                                      ➜  ~ 

➜  ~ curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connector-plugins/MySqlConnector/config/validate -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

HTTP/1.1 500 Request failed.

Date: Sat, 01 Apr 2017 15:01:46 GMT

Cache-Control: must-revalidate,no-cache,no-store

Content-Type: text/html; charset=ISO-8859-1

Content-Length: 332

Server: Jetty(9.2.15.v20160210)


<html>

<head>

<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>

<title>Error 500 </title>

</head>

<body>

<h2>HTTP ERROR: 500</h2>

<p>Problem accessing /connector-plugins/MySqlConnector/config/validate. Reason:

<pre>    Request failed.</pre></p>

<hr /><i><small>Powered by Jetty://</small></i>

</body>

</html>

➜  ~


That means, it is able to parse the json when adding a MySqlConnector connector, but unable to parse at validation of the same json.

Are you able to use debezium/postgres container and add it to Kafka Connect (connect container)?

Thanks,
Naci

On Thursday, March 30, 2017 at 5:10:23 PM UTC-7, Naci Ishakbeyoglu wrote:

Steven Siahetiong

unread,
Apr 2, 2017, 8:26:59 AM4/2/17
to debezium
Here's my working setup:

postgres:
docker run -it -rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=postgres debezium/postgres:9.6 postgres

zookeeper:
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper

kafka:
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka

kafka connect:
docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3  -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "test", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

Note:
"database.dbname" is required.

Naci Ishakbeyoglu

unread,
Apr 3, 2017, 6:00:57 PM4/3/17
to debezium
Thanks Steven, that worked. Although it was there in properties file when trying with standalone connector, I missed it on distributed connector. Then I was stuck at config/validate, as it still errors out with the working config json.

Current problem I notice is an update statement to a table generates an event where its payload has "before" as null and "after" as the record was updated. "before" shouldn't be null, right?

Thanks
Naci

I tried the config/validate with the working parameters but fails. Not sure if you have that working. 
was not returning a correct url even though 

Randall Hauch

unread,
Apr 3, 2017, 8:44:53 PM4/3/17
to debe...@googlegroups.com, Naci Ishakbeyoglu
There appear to be issues related to connector configuration validation and error reporting (https://github.com/apache/kafka/pull/2722) that will be fixed in Kafka 0.10.2.1. See https://confluentcommunity.slack.com/archives/C49L0V3L7/p1491254709809849 for a discussion.

Generally, the “before” won't be null on an update, but there are certain cases when it is: if the Postgres log event has no tuples for the old record (not sure when/if this happens, but it looks like it’s possible); if any columns that make up the key are modified, in which case the connector generates a DELETE event for the old record with the old key and a CREATE event for the new record with the new key.
--

You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.

Naci Ishakbeyoglu

unread,
Apr 4, 2017, 12:20:17 AM4/4/17
to debezium
Here is what I did:

inventory=# \d

No relations found.

inventory=# create table customer(id serial primary key, name varchar(20), age int);

CREATE TABLE

inventory=# insert into customer(name, age) values('John', 20);

INSERT 0 1

inventory=# insert into customer(name, age) values('Karla', 25);

INSERT 0 1

inventory=# select * from customer;

 id | name  | age 

----+-------+-----

  1 | John  |  20

  2 | Karla |  25

(2 rows)


inventory=# update customer set age = 30 where id=1;

UPDATE 1

inventory=# select * from customer;

 id | name  | age 

----+-------+-----

  2 | Karla |  25

  1 | John  |  30

(2 rows)


inventory=# delete from customer where id=1;

DELETE 1

inventory=# 


Create 3 records, update one and then delete one. The watcher reports:

Contents of topic dbserver1.public.customer:

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.public.customer.Key"},"payload":{"id":1}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.public.customer.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.public.customer.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int32","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.public.customer.Envelope","version":1},"payload":{"before":null,"after":{"id":1,"name":"John","age":20},"source":{"name":"dbserver1","ts_usec":1491265195278637,"txId":555,"lsn":23735229,"snapshot":null,"last_snapshot_record":null},"op":"c","ts_ms":1491265195452}}

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.public.customer.Key"},"payload":{"id":2}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.public.customer.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.public.customer.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int32","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.public.customer.Envelope","version":1},"payload":{"before":null,"after":{"id":2,"name":"Karla","age":25},"source":{"name":"dbserver1","ts_usec":1491265239468651,"txId":556,"lsn":23735606,"snapshot":null,"last_snapshot_record":null},"op":"c","ts_ms":1491265239490}}

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.public.customer.Key"},"payload":{"id":1}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.public.customer.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.public.customer.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int32","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.public.customer.Envelope","version":1},"payload":{"before":null,"after":{"id":1,"name":"John","age":30},"source":{"name":"dbserver1","ts_usec":1491265299394944,"txId":557,"lsn":23735845,"snapshot":null,"last_snapshot_record":null},"op":"u","ts_ms":1491265299407}}

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.public.customer.Key"},"payload":{"id":1}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.public.customer.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"}],"optional":true,"name":"dbserver1.public.customer.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_usec"},{"type":"int32","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"boolean","optional":true,"field":"last_snapshot_record"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.public.customer.Envelope","version":1},"payload":{"before":{"id":1,"name":null,"age":null},"after":null,"source":{"name":"dbserver1","ts_usec":1491265540523801,"txId":558,"lsn":23736253,"snapshot":null,"last_snapshot_record":null},"op":"d","ts_ms":1491265540536}}

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.public.customer.Key"},"payload":{"id":1}} {"schema":null,"payload":null}


Not sure if this was expected. Didn't run the same exact test on mysql but the update example on tutorial seems similar. The updated record was created after snapshot (empty db), so it should have the log of the record before update?

Thanks
Naci

Randall Hauch

unread,
Apr 4, 2017, 10:30:40 AM4/4/17
to debe...@googlegroups.com, Naci Ishakbeyoglu
Indeed, the update event has a null “before” record, which is unexpected. You might want to enable the connector's DEBUG or TRACE logging to see what’s going on; see http://debezium.io/docs/logging/

Also, if you post more messages, it’d be important to know which versions of PostgreSQL, Kafka, Debezium, etc. you’re using.

Krithika Vijaykumar

unread,
Jun 4, 2017, 8:42:08 PM6/4/17
to debezium
Hi Steven

I tried your working setup and also created a database called inventory(with customer table) in my postgres docker image. I was still not able to see the schema or the event changes as listed in this document here:

I also this command to see the event changes :
docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka:0.5 watch-topic -a -k dbserver1.inventory.customers

I only see this only upto this and no event changes.

Using ZOOKEEPER_CONNECT=172.17.0.3:2181

Using KAFKA_ADVERTISED_PORT=9092

Using KAFKA_ADVERTISED_HOST_NAME=172.17.0.6

Contents of topic dbserver1.inventory.customers:

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].


Any idea what this could be? Thanks for your help in advance

Maria Laborte

unread,
Sep 19, 2017, 5:44:55 PM9/19/17
to debezium
Hi Krithika,
Are you able to see the event changes already?  I have the same problem also.  Can you please share how you resolved this?  Thanks!!

Maria Laborte

unread,
Sep 19, 2017, 10:35:01 PM9/19/17
to debezium
I reached out to Krithika, and she helped me figure out what I missed in my watcher docker container.  I'm able to get the events now. 

NOT WORKING:
docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka:0.5 watch-topic -a -k dbserver1.inventory.customers

WORKING:
docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka:0.5 watch-topic -a -k dbserver1.public.customers

Gunnar Morling

unread,
Sep 20, 2017, 3:13:42 AM9/20/17
to debe...@googlegroups.com
Maria,

Thanks for reporting back. So to clarify, you were actually using the default schema named "public" instead of a dedicated one named "inventory"? I.e. Debezium work as expected?

Thanks,

--Gunnar


--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Maria Laborte

unread,
Sep 20, 2017, 10:41:46 AM9/20/17
to debezium
Hi Gunnar,

I actually created the inventory database, then the customers table:
postgres=# CREATE DATABASE inventory;
CREATE DATABASE
postgres=# GRANT ALL PRIVILEGES ON DATABASE inventory TO postgres;
GRANT
postgres=# \connect inventory
psql (9.6.5, server 9.6.3)
You are now connected to database "inventory" as user "postgres".
inventory=# create table customers(id serial primary key, name varchar(20), age int);
CREATE TABLE
inventory=#

And was trying to watch that via docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka:0.5 watch-topic -a -k dbserver1.inventory.customers, but the watcher wasn't reporting the change events, so I posted my question here in this group.  

Krithika suggested I use "public" and that worked.  The Kafka container that's running has these logs, and it shows 'public.customers', not 'inventory.customers' (which I don't understand).

2017-09-20 13:36:55,852 INFO   Postgres|dbserver1|postgres-connector-task  user 'postgres' connected to database 'inventory' on PostgreSQL 9.6.3 on x86_64-pc-linux-gnu, compiled by gcc (Debian 4.9.2-10) 4.9.2, 64-bit with roles:
role 'pg_signal_backend' [superuser: false, replication: false, inherit: true, create role: false, create db: false, can log in: false]
role 'postgres' [superuser: true, replication: true, inherit: true, create role: true, create db: true, can log in: true]   [io.debezium.connector.postgresql.PostgresConnectorTask]
2017-09-20 13:36:55,852 INFO   Postgres|dbserver1|postgres-connector-task  Found previous offset source_info[server='dbserver1', lsn=0/16A333F, txId=557, useconds=1505866960578314, snapshot=false]   [io.debezium.connector.postgresql.PostgresConnectorTask]
2017-09-20 13:36:55,852 INFO   Postgres|dbserver1|postgres-connector-task  Previous snapshot has completed successfully, streaming logical changes from last known position   [io.debezium.connector.postgresql.PostgresConnectorTask]
2017-09-20 13:36:55,986 INFO   Postgres|dbserver1|records-stream-producer  REPLICA IDENTITY for 'public.customers' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns   [io.debezium.connector.postgresql.PostgresSchema]
2017-09-20 13:36:55,987 INFO   ||  Source task WorkerSourceTask{id=inventory-connector-0} finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSourceTask]
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Maria Laborte

unread,
Sep 20, 2017, 11:11:27 AM9/20/17
to debezium
Hi again,

I created a schema named inventory this time, in the inventory database, then created the inventory.customers table.   I then used this watcher:  docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka:0.5 watch-topic -a -k dbserver1.inventory.customers.  I can see the change events now.

I didn't know that I had to create a schema, else the default 'public' schema will be used.  

Thanks!!

Gunnar Morling

unread,
Sep 20, 2017, 11:28:26 AM9/20/17
to debe...@googlegroups.com
Yes, Postgres is using the "public" schema by default. Glad to hear you got it working.

--Gunnar


To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Reynold Regan

unread,
Jan 18, 2018, 5:41:21 AM1/18/18
to debezium
Hi Steven,

How do I use Kafka's Console consumer to read the events ?

Please elaborate.

Gunnar Morling

unread,
Jan 18, 2018, 5:51:36 AM1/18/18
to debe...@googlegroups.com
Please don't revive old threads for rather unrelated questions.

That said, you can find an example for using the console consumer in our tutorial instructions: https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mysql.

--Gunnar


--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.

Naveenkumar seerangan

unread,
Jun 23, 2022, 5:46:12 AMJun 23
to debezium
Hi All,

docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka

docker run -it --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=$(echo $DOCKER_HOST | cut -f3  -d'/' | cut -f1 -d':') --link zookeeper:zookeeper --link kafka:kafka debezium/connect
 
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 172.18.1.221:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "172.18.1.221", "database.port": "1236", "database.user": "postgres", "database.password": "postgres_nave1085", "database.dbname": "postgres",   "database.server.name": "dbserver1", "database.whitelist": "student", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "my-connect-configs" } }'  --config max.message.bytes=20971520

I am running above statements for kafka cdc requirements. after inserting in postgres i am getting below error and how to connect consumer for redis.(source is postgres)

Kindly request you to help  please.

  [Worker clientId=connect-1, groupId=1] Catching up to assignment's config offset.   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2022-06-23 09:00:31,285 INFO   ||  [Worker clientId=connect-1, groupId=1] Current config state offset 9 is behind group assignment 10, reading to end of config log   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2022-06-23 09:00:31,315 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished reading to end of log and updated config snapshot, new config log offset: 10   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2022-06-23 09:00:31,315 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 10   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2022-06-23 09:00:31,315 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2022-06-23 09:00:53,160 INFO   ||  WorkerSourceTask{id=inventory-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors.   [org.apache.kafka.connect.runtime.WorkerSourceTask]

Regards
naveen

Chris Cranford

unread,
Jun 23, 2022, 9:40:35 AMJun 23
to debe...@googlegroups.com, Naveenkumar seerangan
Hi, so I think the issue might be the "database.whitelist" setting.  Could you try removing that and see if that helps.

CC
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/e4931fae-20d6-420c-8268-8f90316d94c1n%40googlegroups.com.

Naveenkumar seerangan

unread,
Jun 27, 2022, 7:09:49 AMJun 27
to debezium
Dear  crancran,

After removing also not working .are you having any document for end to end change data capture reuirement? if you having can you please send ,it will be useful for me 

Source : postgres
target : Redis


thanks in advance
naveen

Chris Cranford

unread,
Jun 28, 2022, 8:16:30 AMJun 28
to debe...@googlegroups.com, Naveenkumar seerangan
Could you share the full logs of the connector's start up, snapshot and transition to streaming?

Naveenkumar seerangan

unread,
Jul 7, 2022, 3:27:56 AMJul 7
to debezium
POSTGRES TO REDIS Sychronisation through kafka and debezium 

RDIS

docker run --name redis_nave1085 -d -p 1235:6379 redis redis-server

POSTGRES

docker run --name deb_postgres -p 1236:5432 -e POSTGRES_PASSWORD=postgres_nave1085 -d debezium/postgres

ZOOKEEPER

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 -e LOG_LEVEL=DEBUG debezium/zookeeper

KAFKA

docker run -it --rm --name kafka -p 9092:9092 -e LOG_LEVEL=DEBUG --link zookeeper:zookeeper --link deb_postgres:deb_postgres --link redis_nave1085:redis_nave1085 debezium/kafka


CONNECT

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1   -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e STATUS_STORAGE_TOPIC=my_connect_Statuses --link zookeeper:zookeeper --link kafka:kafka  --link deb_postgres:deb_postgres  --link redis_nave1085:redis_nave1085 debezium/connect


RedisSinkConnector Downloaded from Below link ,renamed the lib folder to kafka-connect-jdbc  and moved to kafka/connect path

https://www.confluent.io/hub/jcustenborder/kafka-connect-redis?_ga=2.168589861.581183524.1656910012-1676488886.1654582739&_gac=1.19804234.1657099867.Cj0KCQjw5ZSWBhCVARIsALERCvxrekVZQGt_Mu5CX5ezWnlxL4kbmGG05sWUKqCVB5x4tSMudC7ITcIaAqSHEALw_wcB


docker cp /home/navi1085/jcustenborder-kafka-connect-redis-0.0.2.17/kafka-connect-jdbc  connect:/kafka/connect/

docker exec -it connect /bin/bash
docker restart connect

http://localhost:8083/connector-plugins

[{"class":"com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector","type":"sink","version":"0.0.0.0"},
{"class":"io.debezium.connector.db2.Db2Connector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.mongodb.MongoDbConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.oracle.OracleConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.postgresql.PostgresConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.sqlserver.SqlServerConnector","type":"source","version":"1.9.4.Final"},
{"class":"io.debezium.connector.vitess.VitessConnector","type":"source","version":"1.9.4.Final"},
{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"3.1.0"},
{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"3.1.0"},
{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},
{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]



curl -H "Accept:application/json" localhost:8083
curl -H "Accept:application/json" localhost:8083/connectors

SOURCE CURL
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/   -d '{ "name": "student-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "172.18.1.221", "database.port": "1236", "database.user": "postgres", "database.password": "postgres_nave1085", "database.dbname": "postgres",   "database.server.name": "dbs1",  "schema.whitelist": "public", "database.history.kafka.bootstrap.servers": "kafka:9092",  "database.history.kafka.topic": "schema-changes.student",   "key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable": "false","value.converter.schemas.enable": "false" ,"redis.type":"JSON"} }'
TARGET CURL

curl -i -X POST -H "Accept:application/json" -H "Content-Length: 10000" -H "Content-Type:application/json" localhost:8083/connectors/  -d '{ "name": "RedisSinkConnector223", "config": { "topics" : "redisdatamove1", "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector", "tasks.max": "1",    "redis.hosts": "172.18.1.221:1235","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","redis.type":"JSON"}}'

 
 
 http://localhost:8083/connectors
 
 ["RedisSinkConnector223","student-connector"]
 
POSTGRES DB

  create table student ( id int  primary key ,
username varchar(100) )

insert into student (id,username) values (2,'rajan');

WATCHER


docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka --link redis_nave1085:redis_nave1085 --link deb_postgres:deb_postgres  debezium/kafka watch-topic -a -k  dbs1.public.student
   
   WATCHER RESULTS
   
   {"id":2}        {"before":null,"after":{"id":2,"username":"rajan"},
   "source":{"version":"1.9.4.Final","connector":"postgresql","name":"dbs1","ts_ms":1657178271692,
   "snapshot":"false","db":"postgres","sequence":"[\"23719736\",\"23719792\"]","schema":"public",
   "table":"student","txId":554,"lsn":23719792,"xmin":null},"op":"c","ts_ms":1657178271990,
   "transaction":null}
   
  docker exec -it redis_nave1085 /bin/bash

root@90c79aa80289:/data# redis-cli
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> keys *
1) "__kafka.offset.redisdatamove1.0"
127.0.0.1:6379[1]>

i am getting  the below results in redis  how to resolve the redis data CDC

1) "__kafka.offset.redisdatamove1.0"


127.0.0.1:6379[1]> get "__kafka.offset.redisdatamove1.0"
"{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
127.0.0.1:6379[1]>


Thanks in advance
Naveenkumar.S



Naveenkumar seerangan

unread,
Jul 7, 2022, 9:21:01 AMJul 7
to debezium
FYI  

Please find the redis -cli Monitor values

1657198398.376830 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198458.379588 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198518.382212 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198578.383833 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198590.802212 [1 127.0.0.1:52930] "MGET" "__kafka.offset.redisdatamove1.0"
1657198638.386353 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198698.388647 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198758.391397 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198818.393454 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198878.395889 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198938.398109 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657198998.399989 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199058.401759 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199118.403695 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199178.406471 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199238.409470 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199298.414378 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199358.416915 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199418.419418 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199478.421233 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199538.425479 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199598.428608 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199658.432162 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199718.434408 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199778.436830 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199783.155513 [1 172.17.0.1:42394] "dbsize"
1657199783.242661 [1 172.17.0.1:42394] "scan" "0" "MATCH" "*" "COUNT" "500"
1657199783.310898 [1 172.17.0.1:42394] "ttl" "__kafka.offset.redis.commands.0"
1657199783.310925 [1 172.17.0.1:42394] "memory" "usage" "__kafka.offset.redis.commands.0" "samples" "0"
1657199783.310936 [1 172.17.0.1:42394] "memory" "usage" "__kafka.offset.redisdatamove1.0" "samples" "0"
1657199783.370637 [1 172.17.0.1:42394] "type" "__kafka.offset.redis.commands.0"
1657199783.370649 [1 172.17.0.1:42394] "type" "__kafka.offset.redisdatamove1.0"
1657199783.427672 [1 172.17.0.1:42394] "ttl" "__kafka.offset.redis.commands.0"
1657199783.427691 [1 172.17.0.1:42394] "ttl" "__kafka.offset.redisdatamove1.0"
1657199784.533688 [1 172.17.0.1:42394] "dbsize"
1657199784.581908 [1 172.17.0.1:42394] "scan" "0" "MATCH" "*" "COUNT" "500"
1657199784.631730 [1 172.17.0.1:42394] "ttl" "__kafka.offset.redis.commands.0"
1657199784.631752 [1 172.17.0.1:42394] "memory" "usage" "__kafka.offset.redis.commands.0" "samples" "0"
1657199784.631818 [1 172.17.0.1:42394] "memory" "usage" "__kafka.offset.redisdatamove1.0" "samples" "0"
1657199784.678958 [1 172.17.0.1:42394] "type" "__kafka.offset.redis.commands.0"
1657199784.678978 [1 172.17.0.1:42394] "type" "__kafka.offset.redisdatamove1.0"
1657199784.724380 [1 172.17.0.1:42394] "ttl" "__kafka.offset.redis.commands.0"
1657199784.724401 [1 172.17.0.1:42394] "ttl" "__kafka.offset.redisdatamove1.0"
1657199838.439485 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199898.442270 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"
1657199958.443999 [1 172.17.0.1:42496] "MSET" "__kafka.offset.redisdatamove1.0" "{\"topic\":\"redisdatamove1\",\"partition\":0,\"offset\":0}"

Reply all
Reply to author
Forward
0 new messages