Debezium - mysql connector for AWS MSK

642 views
Skip to first unread message

Jehanzaib Younis

unread,
Apr 21, 2021, 8:17:16 PM4/21/21
to debezium
Hi folks,

My kafka is hosted with AWS MSK and Debezium connector is on a separate machine where mysql is running. 
I can see schema only messages when i restart the kafka-connect. i am not seeing any other messages. Can someone please guide me?

I am running kafka connect in standalone mode. 

Please see my standalone-connect.properties file. 

connector.class = io.debezium.connector.mysql.MySqlConnector
database.hostname = 127.0.0.1
database.port = 3306
database.user = xxxx
database.password = xxxx
database.whitelist = mydatabase
table.whitelist = mydatabase.table1
column.include.list = mydatabase. table1.testcolumn1,mydatabase. table1.testcolumn2

database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=SCRAM-SHA-512
database.history.producer.security.protocol =SASL_SSL
database.history.producer.sasl.mechanism=SCRAM-SHA-512

include.schema.changes = false
include.query = true
key.converter.schemas.enable = false
value.converter.schemas.enable = false
snapshot.mode = schema_only


My kafka-connect logs are showing something like this again and again ;)

[2021-04-22 12:11:45,596] WARN [Producer clientId=connector-producer-mysql-mydatabase-connector-0] Bootstrap broker b-2.service.amazonaws.com:9096 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1033)
[2021-04-22 12:11:45,969] WARN [Producer clientId=connector-producer-mysql-mydatabase-connector-0] Bootstrap broker b-3.service.amazonaws.com:9096 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1033)
[2021-04-22 12:11:46,315] WARN [Producer clientId=connector-producer-mysql-mydatabase-connector-0] Bootstrap broker b-1.service.amazonaws.com:9096 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1033)
[2021-04-22 12:11:46,596] ERROR WorkerSourceTask{id=mysql-mydatabase-connector-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:500)
[2021-04-22 12:11:46,596] ERROR WorkerSourceTask{id=mysql-mydatabase-connector-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:116)


Can anyone pin point the issue?

Thanks

Hieu Lam Tri

unread,
Apr 23, 2021, 7:18:08 AM4/23/21
to debezium
Hi, 
Why you want to use snapshot.mode=schema_only ? I think it should be "initial" or "exported"
I think you can make sure that you can connect to msk topics from debezium machine. 
Try running 
./kafka-topics.sh --bootstrap-server xxx --list 

to see if you can list all of the topics in msk. 

Regards,
Hieu 

Jehanzaib Younis

unread,
Apr 23, 2021, 8:10:21 AM4/23/21
to debezium
Thank you. i can change to initial not a problem. 
I tried kafka-topics.sh but getting error as shown below: 

[xx@xx sbin]#  /opt/kafka_2.13-2.6.0/bin/kafka-topics.sh --bootstrap-server broker1.amazonaws.com:9096 --list
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1619179522400, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
[2021-04-24 00:04:22,760] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
        at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1293)
        at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1224)
        at java.lang.Thread.run(Thread.java:748)
[2021-04-24 00:04:22,760] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1619179522400, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at kafka.admin.TopicCommand$AdminClientTopicService.getTopics(TopicCommand.scala:352)
        at kafka.admin.TopicCommand$AdminClientTopicService.listTopics(TopicCommand.scala:260)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1619179522400, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.

I also tested using /opt/kafka_2.13-2.6.0/bin/kafka-console-producer.sh --broker-list broker1.amazonaws.com:9096 --topic mytopic --producer.config /opt/kafka/config/client_sasl.properties
and this one works fine. 

Any idea what it could be ?

Thank you

Jehanzaib Younis

unread,
Apr 23, 2021, 8:18:17 AM4/23/21
to debezium
--updates--

i ran the kafka-topics.sh with --command-config /opt/kafka/config/client_sasl.properties and it worked i can see all the topics. 

[2021-04-24 00:12:42,122] WARN The configuration 'ssl.truststore.location' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2021-04-24 00:12:42,123] WARN The configuration 'sasl.jaas.config' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
__consumer_offsets
connect-configs
connect-offsets
connect-status
myothertopic

The only difference is, here i am using client_sasl.properties but in the standalone-connect.properties i am using like this: 
database.history.producer.security.protocol = SASL_SSL
database.history.producer.sasl.mechanism=SCRAM-SHA-512
database.history.producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
database.history.producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    serviceName="kafka" \
    username="myusername" \
    password="mypassword" ;

in the client_sasl.properties i have this:
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/tmp/kafka.client.truststore.jks
sasl.jaas.config= \
org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="myusername" \
    password="mypassword";

anyidea what it could be ? is there anyway i can just use --config paramter in the standalone-connect.properties ?

Reply all
Reply to author
Forward
0 new messages