kafka broker dispatcher: Failed to handle subscriber response

106 views
Skip to first unread message

Paul Kaisharis

unread,
Nov 23, 2021, 10:10:35 PM11/23/21
to Knative Users
Has anyone encountered the exception below?  the kafka broker dispatcher throws the exception below when sending a cloud event message to the broker, the event is sent to a DLQ service i have set up which simply logs the headers and the event; however, the message is also sent to the service configured on the trigger to process the event.  the trigger service response with an 200 OK and simple json response (below), but for some reason, the dispatcher does not like the response.  i've searched the docs to see if there is something more required on the response to the broker, but i haven't found anything.

{
"message": "Message successfully processed!",
"success": true
}

"2021-11-24T02:59:22.803Z","@version":"1","message":"Failed to handle subscriber response topic=knative-broker-molecule-functions-dev-mfunctions-broker partition=1 offset=0","logger_name":"dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl","thread_name":"vert.x-eventloop-thread-1","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalStateException: Unable to decode response: unknown encoding and non empty response\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.KafkaResponseHandler.handle(KafkaResponseHandler.java:90)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl.lambda$composeSenderAndSinkHandler$8(RecordDispatcherImpl.java:201)\n\tat io.vertx.core.impl.future.Composition.onSuccess(Composition.java:38)\n\tat io.vertx.core.impl.future.FutureImpl$ListenerArray.onSuccess(FutureImpl.java:262)\n\tat io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)\n\tat io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)\n\tat io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)\n\tat io.vertx.core.Promise.complete(Promise.java:66)\n\tat io.vertx.circuitbreaker.impl.CircuitBreakerImpl.lambda$null$4(CircuitBreakerImpl.java:232)\n\tat io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100)\n\tat io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:63)\n\tat io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:38)\n\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: io.cloudevents.rw.CloudEventRWException: Could not parse. Unknown encoding. Invalid content type or spec version\n\tat io.cloudevents.rw.CloudEventRWException.newUnknownEncodingException(CloudEventRWException.java:201)\n\tat io.cloudevents.core.message.impl.MessageUtils.parseStructuredOrBinaryMessage(MessageUtils.java:80)\n\tat io.cloudevents.http.vertx.VertxMessageFactory.createReader(VertxMessageFactory.java:46)\n\tat io.cloudevents.http.vertx.VertxMessageFactory.createReader(VertxMessageFactory.java:89)\n\tat dev.knative.eventing.kafka.broker.dispatcher.impl.KafkaResponseHandler.handle(KafkaResponseHandler.java:74)\n\t... 18 common frames omitted\n","topic":"knative-broker-molecule-functions-dev-mfunctions-broker","partition":1,"offset":0}

Paul Kaisharis

unread,
Nov 23, 2021, 10:41:17 PM11/23/21
to Knative Users
well, the answer was in the exception with a non-empty repsonse.  i changed the response to simply...

return '', 200

and this exception is no longer occuring.  this is a pthon service btw.,  but now i'm getting the exception below which there must be some configuration for.  i'm connecting to a bootstrap server using confluent cloud, which may be why.

"ERROR","level_value":40000,"stack_trace":"io.vertx.circuitbreaker.TimeoutException: operation timeout\n","topic":"knative-broker-molecule-functions-dev-mfunctions-broker","partition":2,"offset":2}

Paul Kaisharis

unread,
Nov 24, 2021, 12:59:48 AM11/24/21
to Knative Users
to close my own thread out, there is an experimental feature https://knative.dev/docs/eventing/experimental-features/ called delivery-timeout to control time out on message delivery for a long running event processor, which this is one.  i added delivery.timeout to the trigger.yaml and the timeout went away.
Reply all
Reply to author
Forward
0 new messages