Debezium producer cannot connect to Confluent Cloud.

2,804 views
Skip to first unread message

Kunna Park

unread,
Mar 22, 2018, 2:49:53 AM3/22/18
to debezium
I'm trying to use Debezium(version 0.7.5) with Confluent Cloud ( kafka cloud service)  https://www.confluent.io/confluent-cloud/ 

I was able to set up debezium with no issue in my local machine, but it is not possible with Confluent Clound. 
also I was able to set up FileSinkConnector, FileSourceConnector, ElasticSearchSinkConnector with ConfluentCloud. so I don't think my Confluent Cloud configuration have issue. 
This is only reproducible when tries to connect Debezium to Confluent Cloud.


My connector cannot create history topic due to connection Timeout. 
is there some other configuration I need to set up?



## Error message

```
connect_1  | org.apache.kafka.connect.errors.ConnectException: Creation of database history topic failed, please create the topic manually
connect_1  |  at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:349)
connect_1  |  at io.debezium.connector.mysql.MySqlSchema.intializeHistoryStorage(MySqlSchema.java:283)
connect_1  |  at io.debezium.connector.mysql.MySqlTaskContext.initializeHistoryStorage(MySqlTaskContext.java:192)
connect_1  |  at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:126)
connect_1  |  at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:45)
connect_1  |  at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:157)
connect_1  |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
connect_1  |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
connect_1  |  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect_1  |  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect_1  |  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect_1  |  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect_1  |  at java.lang.Thread.run(Thread.java:745)
connect_1  | Caused by: java.util.concurrent.TimeoutException
connect_1  |  at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
connect_1  |  at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
connect_1  |  at io.debezium.relational.history.KafkaDatabaseHistory.getKafkaBrokerConfig(KafkaDatabaseHistory.java:354)
connect_1  |  at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:338)
connect_1  |  ... 12 more
connect_1  | [2018-03-21 10:16:50,472] ERROR WorkerSourceTask{id=3-mysql-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
```



## Connect configuration
```
bootstrap.servers=foobar:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets
offset.storage.replication.factor=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3


ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="UserName" password="Password";
security.protocol=SASL_SSL

consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="UserName" password="Password";
consumer.security.protocol=SASL_SSL

producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="UserName" password="Password";
producer.security.protocol=SASL_SSL

plugin.path=connect-plugins/
```


## Debezium Configuration
```
{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "1",
    "database.server.name": "mysql_us1",
    "include.schema.changes": "true",
    "database.history.kafka.bootstrap.servers": "foobar:9092",
    "database.history.kafka.topic": "dbhistory.mysql_us1"
  }
}
```

Kunna Park

unread,
Mar 22, 2018, 2:53:32 AM3/22/18
to debezium

And even if I create history topic manually(cleanup.policy delete, max retention, one partition) I get the same error.


2018년 3월 22일 목요일 오후 3시 49분 53초 UTC+9, Kunna Park 님의 말:

Kunna Park

unread,
Mar 22, 2018, 3:46:35 AM3/22/18
to debezium
looking into logs and I found this.

"ProducerConfig values" does not have "sasl.jaas.config" and "sasl.mechanism" is "GSSAPI". 
is this normal? may be debezium is not handle authentication correctly when tries to read/write history schema.




[2018-03-22 16:38:53,833] INFO KafkaDatabaseHistory Consumer config: {enable.auto.commit=false, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, group.id=6mysql-source-connector-dbhistory, auto.offset.reset=earliest, session.timeout.ms=10000, bootstrap.servers=kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9092,kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9093,kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9094, client.id=6mysql-source-connector-dbhistory, key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, fetch.min.bytes=1} (io.debezium.relational.history.KafkaDatabaseHistory:163)
[2018-03-22 16:38:53,833] INFO KafkaDatabaseHistory Consumer config: {enable.auto.commit=false, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, group.id=6mysql-source-connector-dbhistory, auto.offset.reset=earliest, session.timeout.ms=10000, bootstrap.servers=kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9092,kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9093,kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9094, client.id=6mysql-source-connector-dbhistory, key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, fetch.min.bytes=1} (io.debezium.relational.history.KafkaDatabaseHistory:163)
[2018-03-22 16:38:53,833] INFO KafkaDatabaseHistory Producer config: {bootstrap.servers=kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9092,kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9093,kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9094, value.serializer=org.apache.kafka.common.serialization.StringSerializer, buffer.memory=1048576, retries=1, key.serializer=org.apache.kafka.common.serialization.StringSerializer, client.id=6mysql-source-connector-dbhistory, linger.ms=0, batch.size=32768, max.block.ms=10000, acks=1} (io.debezium.relational.history.KafkaDatabaseHistory:164)
[2018-03-22 16:38:53,833] INFO KafkaDatabaseHistory Producer config: {bootstrap.servers=kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9092,kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9093,kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9094, value.serializer=org.apache.kafka.common.serialization.StringSerializer, buffer.memory=1048576, retries=1, key.serializer=org.apache.kafka.common.serialization.StringSerializer, client.id=6mysql-source-connector-dbhistory, linger.ms=0, batch.size=32768, max.block.ms=10000, acks=1} (io.debezium.relational.history.KafkaDatabaseHistory:164)
[2018-03-22 16:38:53,833] INFO ProducerConfig values:
acks = 1
batch.size = 32768
bootstrap.servers = [kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9092, kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9093, kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9094]
buffer.memory = 1048576
client.id = 6mysql-source-connector-dbhistory
compression.type = none
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 10000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
retries = 1
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
 (org.apache.kafka.clients.producer.ProducerConfig:238)
[2018-03-22 16:38:53,835] INFO Kafka version : 1.0.1 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-03-22 16:38:53,835] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-03-22 16:38:54,477] INFO AdminClientConfig values:
bootstrap.servers = [kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9092, kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9093, kafka-d7cf38e5682303e4.us-west-2.aws.confluent.cloud:9094]
client.id = 6mysql-source-connector-dbhistory
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
receive.buffer.bytes = 65536
retries = 1
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
 (org.apache.kafka.clients.admin.AdminClientConfig:238)
[2018-03-22 16:38:54,478] WARN The configuration 'value.serializer' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:246)
[2018-03-22 16:38:54,478] WARN The configuration 'batch.size' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:246)
[2018-03-22 16:38:54,478] WARN The configuration 'max.block.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:246)
[2018-03-22 16:38:54,478] WARN The configuration 'acks' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:246)
[2018-03-22 16:38:54,479] WARN The configuration 'buffer.memory' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:246)
[2018-03-22 16:38:54,479] WARN The configuration 'key.serializer' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:246)
[2018-03-22 16:38:54,479] WARN The configuration 'linger.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:246)
[2018-03-22 16:38:54,479] INFO Kafka version : 1.0.1 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-03-22 16:38:54,479] INFO Kafka commitId : unknown (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-03-22 16:38:57,484] INFO Stopping MySQL connector task (io.debezium.connector.mysql.MySqlConnectorTask:239)
[2018-03-22 16:38:57,484] INFO Stopping MySQL connector task (io.debezium.connector.mysql.MySqlConnectorTask:239)
[2018-03-22 16:38:57,485] INFO WorkerSourceTask{id=6mysql-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-03-22 16:38:57,485] INFO WorkerSourceTask{id=6mysql-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
[2018-03-22 16:38:57,485] ERROR WorkerSourceTask{id=6mysql-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Creation of database history topic failed, please create the topic manually
at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:349)
at io.debezium.connector.mysql.MySqlSchema.intializeHistoryStorage(MySqlSchema.java:283)
at io.debezium.connector.mysql.MySqlTaskContext.initializeHistoryStorage(MySqlTaskContext.java:192)
at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:126)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:45)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:157)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
at io.debezium.relational.history.KafkaDatabaseHistory.getKafkaBrokerConfig(KafkaDatabaseHistory.java:354)
at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:338)
... 12 more
[2018-03-22 16:38:57,485] ERROR WorkerSourceTask{id=6mysql-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-03-22 16:38:57,486] INFO [Producer clientId=producer-5] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:341)
[2018-03-22 16:39:03,759] INFO WorkerSourceTask{id=6mysql-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)


Jiri Pechanec

unread,
Mar 22, 2018, 3:58:48 AM3/22/18
to debezium
Hi,

could you please configure the missing property? What will happen then?

J.

박건하

unread,
Mar 22, 2018, 4:32:06 AM3/22/18
to debe...@googlegroups.com
Hi, Jiri.

what property did I miss?



--
You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/6Nsq8GSlLEY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+unsubscribe@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/7c53bdfe-4ad4-4d93-8c94-7a0710a519d3%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Jiri Pechanec

unread,
Mar 22, 2018, 5:31:21 AM3/22/18
to debezium
Looking at the log you have 'sasl.mechanism = GSSAPI'. At the same time it seems that this setting requires 'sasl.jaas.config' to be set. These are Kafka producer/consumer options not Debezium ones. Please look at http://debezium.io/docs/connectors/mysql/ and search for 'pass-through configuration properties' to find out how to configure it properly.

J.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Randall Hauch

unread,
Mar 22, 2018, 6:51:58 PM3/22/18
to debezium
The `database.history.kafka.*` properties are used by Debezium to establish a consumer to read from and a producer to write to Debezium's database history. You need to add here all of the same `producer.*` and `consumer.*` properties in your worker config, but with the `database.history.kafka.` prefix.


 

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Vishakha Tiwari

unread,
Sep 12, 2019, 2:14:45 AM9/12/19
to debezium
Hi gave same configuration as yours but I can't see any data on topic present in confluent cloud even not on consumer (running through CCloud cli). Can you tell how to see data on topic or receive on consumer side?

Jon C

unread,
Oct 3, 2019, 8:44:04 AM10/3/19
to debezium
This could be similar to the issue I experienced (https://github.com/debezium/debezium/pull/811) - if upgrading your Debezium version is an option upgrading to version 0.9.3 or higher should contain the fix that solved this for us.

Robin Moffatt

unread,
Oct 17, 2019, 12:38:33 AM10/17/19
to debezium
Reply all
Reply to author
Forward
0 new messages