[confluent ksqlDB 7.1.1] : Pull Query Not responding and sometime responding very late

580 views
Skip to first unread message

Anup Tiwari

unread,
May 27, 2022, 11:51:57 AM5/27/22
to ksqldb-users
Hi Team,

We are observing very strange behaviour where ksqlDB is not responding on any node for a simple pull query.

Query : select * from table1 where uid = '11' ;

After a long wait i can see below lines in logs :-


May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,715] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,715] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,716] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,716] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,716] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,717] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,717] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,718] ERROR Error while handling chunk (io.confluent.ksql.rest.client.KsqlTarget:326)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: io.confluent.ksql.rest.client.KsqlRestClientException: Failed to deserialise object
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:53)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlTargetUtil.toRows(KsqlTargetUtil.java:59)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlTarget.lambda$postQueryRequest$3(KsqlTarget.java:200)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlTarget.lambda$null$11(KsqlTarget.java:324)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:214)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.parsetools.impl.RecordParserImpl.lambda$handler$0(RecordParserImpl.java:316)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:248)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.lambda$beginResponse$0(Http1xClientConnection.java:484)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.endResponse(Http1xClientConnection.java:503)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.access$000(Http1xClientConnection.java:242)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection.handleResponseEnd(Http1xClientConnection.java:644)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection.handleHttpMessage(Http1xClientConnection.java:604)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection.handleMessage(Http1xClientConnection.java:577)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:164)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at java.lang.Thread.run(Thread.java:748)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: Caused by: com.fasterxml.jackson.databind.exc.ValueInstantiationException: Cannot construct instance of `io.confluent.ksql.rest.entity.StreamedRow`, problem: Exactly one parameter should be non-null. got: 0
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at [Source: (byte[])"{"@type":"generic_error","error_code":50000,"message":"Worker executor closed"}"; line: 1, column: 79]
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.exc.ValueInstantiationException.from(ValueInstantiationException.java:47)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1907)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.wrapAsJsonMappingException(StdValueInstantiator.java:587)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.rewrapCtorProblem(StdValueInstantiator.java:610)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:293)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.ValueInstantiator.createFromObjectWith(ValueInstantiator.java:288)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:202)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:520)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:51)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: ... 41 more
May 27 21:13:00 ip-*-*-*-* ksql-server-start: Caused by: java.lang.IllegalArgumentException: Exactly one parameter should be non-null. got: 0
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.entity.StreamedRow.checkUnion(StreamedRow.java:290)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.entity.StreamedRow.<init>(StreamedRow.java:215)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at sun.reflect.GeneratedConstructorAccessor80.newInstance(Unknown Source)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.introspect.AnnotatedConstructor.call(AnnotatedConstructor.java:124)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:291)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: ... 51 more
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,720] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,720] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,721] WARN Connection canceled, so returning (io.confluent.ksql.physical.pull.HARouting:376)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,721] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,721] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,722] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,722] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,722] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,722] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,723] ERROR Failed to queue all rows (io.confluent.ksql.physical.pull.HARouting:459)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,724] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,725] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,724] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,726] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,726] INFO Queue closed before results completed. Stopping execution. (io.confluent.ksql.physical.pull.PullPhysicalPlan:99)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:00,727] ERROR Error while handling chunk (io.confluent.ksql.rest.client.KsqlTarget:326)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: io.confluent.ksql.rest.client.KsqlRestClientException: Failed to deserialise object
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:53)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlTargetUtil.toRows(KsqlTargetUtil.java:59)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlTarget.lambda$postQueryRequest$3(KsqlTarget.java:200)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlTarget.lambda$null$11(KsqlTarget.java:324)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.parsetools.impl.RecordParserImpl.handleParsing(RecordParserImpl.java:214)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.parsetools.impl.RecordParserImpl.lambda$handler$0(RecordParserImpl.java:316)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:248)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.lambda$beginResponse$0(Http1xClientConnection.java:484)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.endResponse(Http1xClientConnection.java:503)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.access$000(Http1xClientConnection.java:242)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection.handleResponseEnd(Http1xClientConnection.java:644)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection.handleHttpMessage(Http1xClientConnection.java:604)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.http.impl.Http1xClientConnection.handleMessage(Http1xClientConnection.java:577)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:164)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at java.lang.Thread.run(Thread.java:748)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: Caused by: com.fasterxml.jackson.databind.exc.ValueInstantiationException: Cannot construct instance of `io.confluent.ksql.rest.entity.StreamedRow`, problem: Exactly one parameter should be non-null. got: 0
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at [Source: (byte[])"{"@type":"generic_error","error_code":50000,"message":"Worker executor closed"}"; line: 1, column: 79]
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.exc.ValueInstantiationException.from(ValueInstantiationException.java:47)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1907)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.wrapAsJsonMappingException(StdValueInstantiator.java:587)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.rewrapCtorProblem(StdValueInstantiator.java:610)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:293)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.ValueInstantiator.createFromObjectWith(ValueInstantiator.java:288)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:202)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:520)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.client.KsqlClientUtil.deserialize(KsqlClientUtil.java:51)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: ... 41 more
May 27 21:13:00 ip-*-*-*-* ksql-server-start: Caused by: java.lang.IllegalArgumentException: Exactly one parameter should be non-null. got: 0
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.entity.StreamedRow.checkUnion(StreamedRow.java:290)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at io.confluent.ksql.rest.entity.StreamedRow.<init>(StreamedRow.java:215)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at sun.reflect.GeneratedConstructorAccessor80.newInstance(Unknown Source)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.introspect.AnnotatedConstructor.call(AnnotatedConstructor.java:124)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromObjectWith(StdValueInstantiator.java:291)
May 27 21:13:00 ip-*-*-*-* ksql-server-start: ... 51 more

May 27 21:13:26 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:26,168] INFO Reporting thread saturation 9.136880033333333E-4 for _confluent-ksql-table1-b099b00e-c461-403c-b29b-7ef075223d1f-StreamThread-1 (io.confluent.ksql.utilization.PersistentQuerySaturationMetrics:196)
May 27 21:13:26 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:26,168] INFO Reporting query saturation 9.136880033333333E-4 for table1_79 (io.confluent.ksql.utilization.PersistentQuerySaturationMetrics:213)

May 27 21:13:26 ip-*-*-*-* ksql-server-start: [2022-05-27 21:13:26,172] INFO reporting node-level saturation 0.07371667960666667 (io.confluent.ksql.utilization.PersistentQuerySaturationMetrics:157)



Regards,
Anup Tiwari

Anup Tiwari

unread,
May 28, 2022, 6:28:07 AM5/28/22
to ksqldb-users
Hi Team,

We have seen GC happening in ksql process and so increased the memory and restarted process but even after that no improvement and I can see below error even after increasing memory :-

[2022-05-28 14:34:00,261] ERROR Exception occurred while writing to connection stream:  (io.confluent.ksql.rest.server.resources.streaming.PullQueryStreamWriter:141)
java.io.EOFException: ResponseOutputStream is closed
        at io.confluent.ksql.api.server.ResponseOutputStream.write(ResponseOutputStream.java:60)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
        at io.confluent.ksql.rest.server.resources.streaming.PullQueryStreamWriter.write(PullQueryStreamWriter.java:134)
        at io.confluent.ksql.api.server.OldApiUtils.lambda$streamEndpointResponse$3(OldApiUtils.java:153)
        at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
[2022-05-28 14:34:00,298] INFO 10.*.*.* - - [Sat, 28 May 2022 09:04:00 GMT] "POST /query HTTP/1.1" 200 0 "-" "Apache-HttpClient/4.5.13 (Java/1.8.0_221)" 123 (io.confluent.ksql.api.server.LoggingHandler:144)
[2022-05-28 14:34:00,298] ERROR Exception occurred while writing to connection stream:  (io.confluent.ksql.rest.server.resources.streaming.PullQueryStreamWriter:141)
java.io.EOFException: ResponseOutputStream is closed
        at io.confluent.ksql.api.server.ResponseOutputStream.write(ResponseOutputStream.java:60)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
        at io.confluent.ksql.rest.server.resources.streaming.PullQueryStreamWriter.write(PullQueryStreamWriter.java:134)
        at io.confluent.ksql.api.server.OldApiUtils.lambda$streamEndpointResponse$3(OldApiUtils.java:153)
        at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

Regards,
Anup Tiwari

Anup Tiwari

unread,
May 30, 2022, 10:06:45 AM5/30/22
to ksqldb-users
Hi Team,

We did some debugging and will share the logs here but before that just wanted to let you know how this issue started and now we are able to reproduce it everytime.

Background :-
We did set up a new cluster of ksqlDB(0.23.1) from confluent 7.1.1 tarball with 4 nodes of m5.xlarge. Then we started running around 8-10 stream-table queries which resulted in our final state table.

Once we had our table ready and being populated in real time, we started doing a Load Test for 3k / 4k / 5k RPS. Initially for some time it ran fine for 3k with latency under 15ms and for 4k/5k latency was < 100 ms. All good till now.

After this we planned to run 3k load for 10-15 Hours in our Dev Setup and we observed that after 4 hours ksqlDB was not receiving any request and can see HTTP 4XX in our ALB logs.
We did some debugging but found no issues at ALB end and when we checked ksqlDB server, we found that even for single pull query with a single key, ksqlDB was not responding and here we tried on all ksqlDB server one by one.All this was happening at night time so when we started debugging more later in day, we found a ticket which is not completely related but given us a thought of testing ksqlDB in single node.

We removed all other nodes and tested our load on a single node(Node1) with 3k and can see that since the Node1 was small it was not giving throughput of 3k so we reduced RPS to 1k which node was handling properly with some latency. Now we started another node(Node2) but hitting load test on the same single node(Node1) like earlier and can see in debug logs that few requests are being handled locally and few are being routed to Node2 which was expected(Refer "Normal Behaviour" Section in logs). After this we stopped sending requests to Node1 and started sending requests to Node2 and can see the same expected behaviour so far so good.

Then we again started sending requests to Node1 and it was able to respond perfectly BUT after that as soon as we started sending requests to Node2 in parallel to Node1, we can see gradual increase of avg latency time for 10-20 sec post which none of the nodes were able to respond to queries and can see no further pull query request to any of the node.Once ksqlDB goes in hanging state, it is not responding to any further pull queries even executed via Cli for a single key.

We tried restarting ksqlDB server on both nodes but now as soon as we start sending pull query requests to both nodes, it is not able to respond to any requests. However we can see in logs that requests are reaching to ksqlDB server(Check "Issue Started" Section in logs). This scenario we are able to reproduce every time now after restart.

Once ksqlDB goes into a hang test, we tried testing pull query via API for a single key to check single key flow in DEBUG logs and can see the same issue(See "Single User Testing" section of logs). Please let us know what is happening here to debug it further and solve it.

DEBUG LOGS:-

-- =============================== Normal Behaviour Started ================================================

[2022-05-28 23:10:42,546] INFO *.*.*.*45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 102 (io.confluent.ksql.api.server.LoggingHandler:144)
[2022-05-28 23:10:42,546] DEBUG Handling pull query for key ['1700000']-Optional.empty in partition 5 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171)
[2022-05-28 23:10:42,546] DEBUG Handling pull query for partition 5 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127)
[2022-05-28 23:10:42,546] DEBUG Before filtering: Active host HostInfo{host='*.*.*.*7', port=8088} , standby hosts [HostInfo{host='*.*.*.*6', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347)
[2022-05-28 23:10:42,546] DEBUG Filtered and ordered hosts: [Node{local = true, location = http://*.*.*.*7:8088/, Host = *.*.*.*7:8088 was selected}, Node{local = false, location = http://*.*.*.*6:8088/, Host = *.*.*.*6:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)
[2022-05-28 23:10:42,546] DEBUG Query select * from STATE_FINAL where stateid = '1700000' ; executed locally at host http://*.*.*.*7:8088/ at timestamp 1653759642546. (io.confluent.ksql.physical.pull.HARouting:284)

[2022-05-28 23:10:42,546] INFO *.*.*.*45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 101 (io.confluent.ksql.api.server.LoggingHandler:144)
[2022-05-28 23:10:42,547] INFO *.*.*.*45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 101 (io.confluent.ksql.api.server.LoggingHandler:144)


[2022-05-28 23:10:42,548] DEBUG Handling pull query for key ['40981084']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171)
[2022-05-28 23:10:42,548] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127)
[2022-05-28 23:10:42,548] DEBUG Before filtering: Active host HostInfo{host='*.*.*.*6', port=8088} , standby hosts [HostInfo{host='*.*.*.*7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347)
[2022-05-28 23:10:42,548] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = *.*.*.*6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = *.*.*.*7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)
[2022-05-28 23:10:42,548] DEBUG Query select * from STATE_FINAL where stateid = '40981084' ; routed to host http://*.*.*.*6:8088/ at timestamp 1653759642548. (io.confluent.ksql.physical.pull.HARouting:306)



[2022-05-28 23:10:42,550] DEBUG Handling pull query for key ['84981287']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171)
[2022-05-28 23:10:42,550] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127)
[2022-05-28 23:10:42,550] DEBUG Before filtering: Active host HostInfo{host='*.*.*.*6', port=8088} , standby hosts [HostInfo{host='*.*.*.*7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347)
[2022-05-28 23:10:42,550] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = *.*.*.*6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = *.*.*.*7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)
[2022-05-28 23:10:42,550] DEBUG Query select * from STATE_FINAL where stateid = '84981287' ; routed to host http://*.*.*.*6:8088/ at timestamp 1653759642550. (io.confluent.ksql.physical.pull.HARouting:306)



[2022-05-28 23:10:42,554] DEBUG Handling pull query for key ['8373283']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171)
[2022-05-28 23:10:42,555] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127)
[2022-05-28 23:10:42,555] DEBUG Before filtering: Active host HostInfo{host='*.*.*.*6', port=8088} , standby hosts [HostInfo{host='*.*.*.*7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347)
[2022-05-28 23:10:42,555] INFO *.*.*.*45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 102 (io.confluent.ksql.api.server.LoggingHandler:144)
[2022-05-28 23:10:42,555] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = *.*.*.*6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = *.*.*.*7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)
[2022-05-28 23:10:42,555] DEBUG Query select * from STATE_FINAL where stateid = '8373283' ; routed to host http://*.*.*.*6:8088/ at timestamp 1653759642555. (io.confluent.ksql.physical.pull.HARouting:306)


-- =============================== Normal Behaviour Ended ================================================


-- =========================== Issue Started (When i started hitting pull Query API requests on another node ======================

[2022-05-28 23:10:42,555] INFO *.*.*.*45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 936 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 102 (io.confluent.ksql.api.server.LoggingHandler:144)



[2022-05-28 23:10:42,555] INFO *.*.*.*45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 101 (io.confluent.ksql.api.server.LoggingHandler:144)
[2022-05-28 23:10:42,556] DEBUG Handling pull query for key ['85005993']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171)
[2022-05-28 23:10:42,556] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127)
[2022-05-28 23:10:42,556] DEBUG Before filtering: Active host HostInfo{host='*.*.*.*6', port=8088} , standby hosts [HostInfo{host='*.*.*.*7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347)
[2022-05-28 23:10:42,557] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = *.*.*.*6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = *.*.*.*7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)



[2022-05-28 23:10:42,557] INFO *.*.*.*45 - - [Sat, 28 May 2022 17:40:42 GMT] "POST /query HTTP/1.1" 200 797 "-" "Apache-HttpClient/4.5.12 (Java/1.8.0_221)" 101 (io.confluent.ksql.api.server.LoggingHandler:144)
[2022-05-28 23:10:42,558] DEBUG Handling pull query for key ['85035395']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171)
[2022-05-28 23:10:42,558] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127)
[2022-05-28 23:10:42,558] DEBUG Before filtering: Active host HostInfo{host='*.*.*.*6', port=8088} , standby hosts [HostInfo{host='*.*.*.*7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347)
[2022-05-28 23:10:42,558] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = *.*.*.*6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = *.*.*.*7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)



-- ========= Single User Testing : curl -H "Content-Type: application/json" -X POST --data @body.json http://*.*.*.*7:8088/query =============

[2022-05-28 23:30:41,411] DEBUG Handling pull query for key ['1280']-Optional.empty in partition 5 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171)
[2022-05-28 23:30:41,411] DEBUG Handling pull query for partition 5 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127)
[2022-05-28 23:30:41,411] DEBUG Before filtering: Active host HostInfo{host='*.*.*.*7', port=8088} , standby hosts [HostInfo{host='*.*.*.*6', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347)
[2022-05-28 23:30:41,411] DEBUG Filtered and ordered hosts: [Node{local = true, location = http://*.*.*.*7:8088/, Host = *.*.*.*7:8088 was selected}, Node{local = false, location = http://*.*.*.*6:8088/, Host = *.*.*.*6:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)
[2022-05-28 23:30:41,421] DEBUG Send heartbeat to host *.*.*.*6:8088 at 1653760841421 (io.confluent.ksql.rest.server.HeartbeatAgent:329)
[2022-05-28 23:30:41,485] DEBUG Receive heartbeat at: 1653760841485 from host: *.*.*.*6:8088  (io.confluent.ksql.rest.server.HeartbeatAgent:118)



[2022-05-28 23:34:58,132] DEBUG Host: *.*.*.*6:8088 has 0 missing heartbeats (io.confluent.ksql.rest.server.HeartbeatAgent:306)
[2022-05-28 23:34:58,185] DEBUG Receive heartbeat at: 1653761098185 from host: *.*.*.*6:8088  (io.confluent.ksql.rest.server.HeartbeatAgent:118)
[2022-05-28 23:34:58,186] INFO *.*.*.*6 - - [Sat, 28 May 2022 18:04:58 GMT] "POST /heartbeat HTTP/1.1" 200 13 "-" "-" 57 (io.confluent.ksql.api.server.LoggingHandler:144)
[2022-05-28 23:34:58,220] DEBUG Handling pull query for key ['84981287']-Optional.empty in partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:171)
[2022-05-28 23:34:58,221] DEBUG Handling pull query for partition 8 of state store Aggregate-Aggregate-Materialize. (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:127)
[2022-05-28 23:34:58,221] DEBUG Before filtering: Active host HostInfo{host='*.*.*.*6', port=8088} , standby hosts [HostInfo{host='*.*.*.*7', port=8088}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:347)
[2022-05-28 23:34:58,221] DEBUG Filtered and ordered hosts: [Node{local = false, location = http://*.*.*.*6:8088/, Host = *.*.*.*6:8088 was selected}, Node{local = true, location = http://*.*.*.*7:8088/, Host = *.*.*.*7:8088 was selected}] (io.confluent.ksql.execution.streams.materialization.ks.KsLocator:371)
[2022-05-28 23:34:58,221] DEBUG Send heartbeat to host *.*.*.*6:8088 at 1653761098221 (io.confluent.ksql.rest.server.HeartbeatAgent:329)
[2022-05-28 23:34:58,285] DEBUG Receive heartbeat at: 1653761098285 from host: *.*.*.*6:8088  (io.confluent.ksql.rest.server.HeartbeatAgent:118)


-- ============ ksqlDB Server.Properties File ==============

listeners=http://*.*.*7:8088
# For UDF
ksql.extension.dir=/opt/confluent/confluent-7.1.1/etc/ksqldb/ext

#------ Logging config -------
ksql.logging.processing.topic.auto.create=true
ksql.logging.processing.stream.auto.create=true
ksql.logging.processing.rows.include=true

#------ External service config -------
bootstrap.servers=*.*.*.1:9092,*.*.*.2:9092,*.*.*.3:9092
ksql.service.id=mTest

#-------- Additional Prod Settings -----------
ksql.streams.producer.retries=2147483647
ksql.streams.producer.confluent.batch.expiry.ms=9223372036854775807
ksql.streams.producer.request.timeout.ms=300000
ksql.streams.producer.max.block.ms=9223372036854775807
ksql.streams.replication.factor=3
ksql.sink.replicas=3
ksql.streams.state.dir=/data/ksql
ksql.streams.num.standby.replicas=2
ksql.query.pull.enable.standby.reads=true
ksql.heartbeat.enable=true
ksql.lag.reporting.enable=true
ksql.query.pull.metrics.enabled=true
ksql.udf.collect.metrics=true
ksql.streams.num.stream.threads=1
ksql.plugins.rocksdb.cache.size=4294967296
ksql.streams.commit.interval.ms=2000

#ksql.streams.metrics.recording.level=TRACE
ksql.streams.metrics.recording.level=DEBUG
compression.type=snappy

Regards,
Anup Tiwari

Anup Tiwari

unread,
May 30, 2022, 10:20:55 AM5/30/22
to ksqldb-users
Hi Team,

One more thing which i observed is, when i stop my load test and after sometime when i stop ksqlDB process then can see below logs related to pull queries and it seems that queries which were in queued due to older queries which got stucked are being replayed and some stuck queries are coming out with error  :-

Logs :-

DEBUG Query select * from STATE_FINAL where STATEID = '323847' ; routed to host http://*.*.*6:8088/ at timestamp 1653919699663. (io.confluent.ksql.physical.pull.HARouting:306)
INFO *.*.*6 - - "POST /query HTTP/1.1" 200 0 "-" "-" 321 (io.confluent.ksql.api.server.LoggingHandler:144)
WARN Interrupted while writing to connection stream (io.confluent.ksql.rest.server.resources.streaming.PullQueryStreamWriter:139)
INFO *.*.*6 - - "POST /query HTTP/1.1" 200 0 "-" "-" 321 (io.confluent.ksql.api.server.LoggingHandler:144)
WARN Interrupted while writing to connection stream (io.confluent.ksql.rest.server.resources.streaming.PullQueryStreamWriter:139


INFO *.*.*6 - - "POST /query HTTP/1.1" 200 0 "-" "-" 321 (io.confluent.ksql.api.server.LoggingHandler:144)

ERROR Failed to queue all rows (io.confluent.ksql.physical.pull.HARouting:459)
ERROR Failed to handle request 500 /query (io.confluent.ksql.api.server.FailureHandler:38)
java.lang.IllegalStateException: Worker executor closed
        at io.vertx.core.impl.WorkerExecutorImpl.executeBlocking(WorkerExecutorImpl.java:56)
        at io.confluent.ksql.rest.server.KsqlServerEndpoints.executeOnWorker(KsqlServerEndpoints.java:316)
        at io.confluent.ksql.rest.server.KsqlServerEndpoints.executeOldApiEndpointOnWorker(KsqlServerEndpoints.java:328)
        at io.confluent.ksql.rest.server.KsqlServerEndpoints.executeQueryRequest(KsqlServerEndpoints.java:193)
        at io.confluent.ksql.api.server.ServerVerticle.lambda$handleQueryRequest$5(ServerVerticle.java:252)
        at io.confluent.ksql.api.server.OldApiUtils.handleOldApiRequest(OldApiUtils.java:72)
        at io.confluent.ksql.api.server.ServerVerticle.handleQueryRequest(ServerVerticle.java:248)
        at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1038)
        at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:101)
        at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:132)
        at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.doEnd(BodyHandlerImpl.java:296)
        at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.end(BodyHandlerImpl.java:276)
        at io.vertx.ext.web.handler.impl.BodyHandlerImpl.lambda$handle$0(BodyHandlerImpl.java:87)
        at io.vertx.core.http.impl.HttpServerRequestImpl.onEnd(HttpServerRequestImpl.java:525)
        at io.vertx.core.http.impl.HttpServerRequestImpl.handleEnd(HttpServerRequestImpl.java:511)
        at io.vertx.core.http.impl.Http1xServerConnection.handleEnd(Http1xServerConnection.java:176)
        at io.vertx.core.http.impl.Http1xServerConnection.handleContent(Http1xServerConnection.java:163)
        at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:140)
        at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
        at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)

Regards,
Anup Tiwari

Anup Tiwari

unread,
May 31, 2022, 3:21:40 PM5/31/22
to ksqldb-users
Team,

After digging a bit more, it seems a bit related to https://github.com/confluentinc/ksql/pull/8164

Also changing ksql.query.pull.thread.pool.size / ksql.query.pull.router.thread.pool.size to a bit higher number helps queries to come out for lower RPS during load test but still higher RPS are going in Hang state so not sure how to tune it.


Regards,
Anup Tiwari

Anup Tiwari

unread,
Jun 1, 2022, 2:32:41 PM6/1/22
to ksqldb-users
Team,

I can see a similar issue in ksqlDB 0.25.1.
I don't know what is happening and why it is happening so frequently because ideally even with default settings ksqlDB should be able to handle 5k RPS easily.

I have also raised it as a question here --> https://github.com/confluentinc/ksql/issues/9161

Regards,
Anup Tiwari

Reply all
Reply to author
Forward
0 new messages