I have set up a Kafka cluster on my linux machine secured using keycloak (OAUTHBEARER) Mechanism. I can use the Kafka Console Consumers and Producers to send and receive messages.
I have tried to connect to Kafka from my consumers and producers from wildfly (the code is deployed as module on the wildfly App server) . I have set up all the required configuration (Config Section below)
The SASL_JAAS_CONFIG has the details like (apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required LoginStringClaim_sub='kafka-client');
I am able to get authenticated with the broker , but in the client callback I am getting an Unsupported Callback error . I have 3 modules in wildfly (wildfly 19 , java11)
1) kafka producer consumer code module dependent on the
2) oauth jar (for logincallbackhandler is defined her to set the oauthbearer token ) dependent on the
3) kafka-client jar (2.8.0)]
I can see that the client call back is of type CLIENTCREDENTIAL INSTEAD OF OAuthBearerTokenCallback. The saslclient invoked from kafka code is getting set as AbstractSaslClient (Defined by an implicit wildfly module )
instead of OAuthBearerSaslClient.
The below snippet of code says that the wildfly AbstractSaslClient is used instead of the OauthBearerSaslClient packaged with the kafka client jars .I have done everything to exclude the org.wildfly.security.elytron-private module from my custom modules using exclude-sets in the dependencies (please see attached module.tx)
==================
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at
=========================================
I believe this is some issue the way wildfly loads the modules but I can see the same classloader for all the classed loaded
Can I get any pointers on this one ?
LOGS
rg.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: ELY05176: Unsupported callback [Caused by javax.security.auth.callback.UnsupportedCallbackException]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: ELY05176: Unsupported callback [Caused by javax.security.auth.callback.UnsupportedCallbackException]
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.k...@1.1.8.1//org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
at org.apache.k...@1.1.8.1//org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.k...@1.1.8.1//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
at com.lgc.common.core//com.lgc.dsl.notifications.consumer.DataChangeNoticeKafkaConsumer.poll(DataChangeNoticeKafkaConsumer.java:388)
at com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.consumeNotification(DataChangeNotificationProducer.java:204)
at com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.retrieveAndProcessNotificationObject(DataChangeNotificationProducer.java:106)
at com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.run(DataChangeNotificationProducer.java:75)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: javax.security.auth.callback.UnsupportedCallbackException
at com.lgc.common.koauth//com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler.handle(AuthOBearerSaslClientCallbackHandler.java:91)
... 27 more
LOGS WHERE THE SSL HANDSHAKE IS SUCCESSFUL , THE CLIENT CALL BACK IS CLIENTCREDENTIAL INSTEAD OF OAuthBearerTokenCallback. The saslclient is getting set as AbstractSaslClient instead of OAuthBearerSaslClient
21-08-29 16:21:25,756 DEBUG [io.undertow.request] (management I/O-1) Upgrading request HttpServerExchange{ GET /}
2021-08-29 16:21:25,760 DEBUG [org.apache.kafka.common.network.SslTransportLayer] (OWNotificationProducer) [SslTransportLayer channelId=-1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=i-10-134-194-96/10.134.194.96:9093], selector=sun.nio.ch.EPollSelectorImpl@50326a63, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'i-10-134-194-96' peerPort 9093 peerPrincipal 'CN=i-10-134-194-96, OU=Foo, O=acme corp, L=Duckburg, ST=Duckburg, C=WD' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384'
2021-08-29 16:21:25,765 DEBUG [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, groupId=OpenWorksConsumer] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2021-08-29 16:21:25,766 DEBUG [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, groupId=OpenWorksConsumer] Set SASL client state to SEND_HANDSHAKE_REQUEST
2021-08-29 16:21:25,767 DEBUG [org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | CommonKafkaProducer) [Producer clientId=CommonKafkaProducer] Give up sending metadata request since no node is available
2021-08-29 16:21:25,767 DEBUG [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, groupId=OpenWorksConsumer] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2021-08-29 16:21:25,768 DEBUG [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, groupId=OpenWorksConsumer] Set SASL client state to INITIAL
2021-08-29 16:21:25,769 INFO [com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler] (OWNotificationProducer) The class loaders are as follows ************ Callbackclienthandler class com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler ModuleClassLoader for Module "com.lgc.common.koauth" from local module loader @6253c26 (finder: local module finder @49049a04 (roots: /opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
2021-08-29 16:21:25,778 INFO [com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler] (OWNotificationProducer) The class loaders are as follows ************ OAuthBearerTokenCallback class org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback ModuleClassLoader for Module "org.apache.kafka.clients" version 1.1.8.1 from local module loader @6253c26 (finder: local module finder @49049a04 (roots: /opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
2021-08-29 16:21:25,787 INFO [com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler] (OWNotificationProducer) The class loaders are as follows ************ OAuthBearerLoginModule class org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule ModuleClassLoader for Module "org.apache.kafka.clients" version 1.1.8.1 from local module loader @6253c26 (finder: local module finder @49049a04 (roots: /opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
2021-08-29 16:21:25,800 DEBUG [org.jboss.jca.core.connectionmanager.pool.validator.ConnectionValidator] (ConnectionValidator) Notifying pools, interval: 500
2021-08-29 16:21:25,800 DEBUG [org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject] (ConnectionValidator) Checking for connection within frequency
2021-08-29 16:21:25,796 INFO [com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler] (OWNotificationProducer) ******************* The callbacls are org.wildfly.security.auth.callback.CredentialCallback@2b29cf23 ClassName class org.wildfly.security.auth.callback.CredentialCallbackModuleClassLoader for Module "org.wildfly.security.elytron-private" version 1.11.4.Final from local module loader @6253c26 (finder: local module finder @49049a04 (roots: /opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
2021-08-29 16:21:25,803 DEBUG [org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject] (ConnectionValidator) Returning for connection within frequency
2021-08-29 16:21:25,803 DEBUG [org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject] (ConnectionValidator) Checking for connection within frequency
2021-08-29 16:21:25,805 INFO [org.apache.kafka.common.network.Selector] (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, groupId=OpenWorksConsumer] Failed authentication with i-10-134-194-96/10.134.194.96 (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: ELY05176: Unsupported callback [Caused by javax.security.auth.callback.UnsupportedCallbackException]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.)
2021-08-29 16:21:25,809 DEBUG [org.apache.kafka.clients.NetworkClient] (OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, groupId=OpenWorksConsumer] Node -1 disconnected.
Config
2021-08-29 16:21:25,119 INFO [org.apache.kafka.clients.consumer.ConsumerConfig] (OWNotificationProducer) ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [i-10-134-194-96:9093]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-OpenWorksConsumer-2
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = OpenWorksConsumer
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 330000
max.poll.records = 100
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = class com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = class com.oauth2.security.oauthbearer.OAuthAuthenticateLoginCallbackHandler
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = OAUTHBEARER
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 60000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm =
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /opt/Landmark/new_certs/securityserver.keystore
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Regards,
Shankar