TopicAuthorizationException: Not authorized to access topics

7,185 views
Skip to first unread message

최동주

unread,
Feb 28, 2022, 4:13:27 AM2/28/22
to debezium
is wrong my debezium option??


error log
[2022-02-28 17:53:30,043] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter:109)
[2022-02-28 17:53:30,043] ERROR WorkerSourceTask{id=douzone_2202281716-0} Unhandled exception when committing:  (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:121)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
        at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:490)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:113)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:47)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:86)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[2022-02-28 17:53:30,516] ERROR WorkerSourceTask{id=douzone_2202281716-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:500)
[2022-02-28 17:53:30,517] ERROR WorkerSourceTask{id=douzone_2202281716-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
        at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:282)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:336)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [klagoDev_init]


Debezium option

{
"name" : "oneQuery",
"config" : {
"include.schema.changes":"true",
"snapshot.mode":"schema_only_recovery",
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"poll.interval.ms": "10000",
"database.hostname" : "127.0.0.1",
"database.port" : "3306",
"database.user":"root",
"database.password":"root",

"database.connectionTimeZone":"Asia/Seoul",
"database.server.name":"klagoDev_init",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.history.kafka.topic":"history_000010",
"table.include.list":
"neos_1234.debezium_signal,neos_1234.t_co_emp,neos_1234.t_co_dept,neos.t_co_emp,dongju_neos.user,dongju_neos.debezium_signal,study.employee",
"database.serverTimezone":"Asia/Seoul",
"tasks.max":"1",
"transforms":"unwrap,route",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields":"op,table,db,snapshot",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement":"add9",
"decimal.handling.mode":"double",

"database.history.security.protocol":"SASL_PLAINTEXT",
"database.history.sasl.mechanism":"SCRAM-SHA-512",
"database.history.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username='admin' password='admin';",

"database.history.producer.security.protocol":"SASL_PLAINTEXT",
"database.history.producer.sasl.mechanism":"SCRAM-SHA-512",
"database.history.producer.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username='admin' password='admin';",

"database.history.consumer.security.protocol":"SASL_PLAINTEXT",
"database.history.consumer.sasl.mechanism":"SCRAM-SHA-512",
"database.history.consumer.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username='admin' password='admin';",

"producer.security.protocol":"SASL_PLAINTEXT",
"producer.sasl.mechanism":"SCRAM-SHA-512",
"producer.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username='admin' password='admin';",

"consumer.security.protocol":"SASL_PLAINTEXT",
"consumer.sasl.mechanism":"SCRAM-SHA-512",
"consumer.sasl.jaas.config":"org.apache.kafka.common.security.scram.ScramLoginModule required username='admin' password='admin';",


"signal.data.collection":"cdctool.debezium_signal"
}
}





kafka connection config

group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false


bootstrap.servers=kafka-spark-hs.kafka-test:9093

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

producer.security.protocol=SASL_PLAINTEXT
producer.sasl.mechanism=SCRAM-SHA-512
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=SCRAM-SHA-512
consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

plugin.path=/opt/bitnami/kafka/connect

Chris Cranford

unread,
Mar 1, 2022, 9:37:54 AM3/1/22
to debe...@googlegroups.com, 최동주
Please check your Kafka security set up.  It would appear that the user you're authenticating to Kafka with does not have the right ACLs to read topics.
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/01a951ca-a440-48e2-9b79-012a8f6e8f3bn%40googlegroups.com.

Reply all
Reply to author
Forward
0 new messages