QueryInterruptedException occurs when querying for large number of rows in druid cluster

1,163 views
Skip to first unread message

Nitish P

unread,
Sep 1, 2021, 5:57:28 AM9/1/21
to Druid User
Hi,

We use druid-0.20.0 version on our production & we are currently encountering an issue which we do not understand.

Our current production cluster : 
 - 2 historicals (8core, 64gb, 600Gb)
 - 1 middle manager (8core, 32gb)
 - 2 brokers (16core, 64gb)
 - 1 coordinator (8core,32gb),
 - 1 overlord (8core, 32Gb) and 
 - 1 router (4core, 16gb)

We have set maxQueryTimeout as 15 minutes as we do not want any queries to execute beyond this limit (we are still experimenting on this)

We are trying to experiment the current limitation of our server and as part of this we were experimenting on the below query which is one of the queries which we wish to execute via our redash server.

We observed that as number of rows increases the query is automatically interrupted, after searching in druid user groups we found that this is related to timeout of the query and that by increasing timeout we can execute the query but of course with more resource consumption.

We tried but we still hit the limitations not sure why.


Query tested which has total=51716755 rows
========================
select __time, << 8 columns more >>
from $datasource 
where __time BETWEEN TIMESTAMP '2021-08-24 00:00:00' and TIMESTAMP '2021-08-24 23:59:59'


Test-1 : historical heap of 6gb & directmemory=12gb.
========================
With default timeout :
- We executed the query through a python script with no timeout specified.
- The query failed in <3 minutes returning about 3 million rows (less thatn the total row count) 
- Interestingly the broker reported exception but the python client closed without any issues printing that it had retrieved 3 million rows.
- analysing the failure we found the order of failure as follows
  1. historical-1 failed
  2. broker interrupted
  3. historical-2 interrupted
  4. Client exited reporting that it got 3 million rows

With explicit timeout = 15 minutes : 
- The query failed at 5 minute mark.
- but this time the client threw exception and it did not print any collected row count information.
- It threw exception "('Connection broken: IncompleteRead(4902 bytes read, 6741 more expected)', IncompleteRead(4902 bytes read, 6741 more expected))"


We thought maybe the issue is related to heap capacity of historicals as we found that at one point in historical-1 it hit 6gb limit, so we increased the heap to 10Gb and tried to test again.


Test-2 : historical heap of 10gb & directmemory=12gb.
========================
With explicit timeout = 15 minutes : 
- The query failed again at 5 minute mark with exception on client side as  "('Connection broken: IncompleteRead(4902 bytes read, 6741 more expected)', IncompleteRead(4902 bytes read, 6741 more expected))"
- the heap consumption did reach around 10Gb but it was getting garbage collected with enough memory available to run further.
- Overall it was similar to 6gb heap state.


So we think this may not be related to heap but something else(I have attached the logs for both tests), so can you please provide any insight on what causes this ?
Is there a restriction on number of rows that can be returned by druid ?
On what condition does "Unable to send SQL response"  occur in brokers ?
On what condition does "org.apache.druid.server.QueryLifecycle - Exception while processing queryId [7caa78f5-3044-455e-bebd-80a86b223077] (com.fasterxml.jackson.databind.JsonMappingException: [no message for org.apache.druid.query.QueryInterruptedException])" occur in historical servers ?

Regards,
Nitish
Test-1.logs
Test-2.logs

Nitish P

unread,
Sep 6, 2021, 12:18:50 AM9/6/21
to Druid User
Any updates on this please ?

vijay narayanan

unread,
Sep 6, 2021, 12:27:17 AM9/6/21
to druid...@googlegroups.com
I have seen that when querying from python, python typically cannot keep up with the broker and this tends to make the broker run out of heap sometimes. I would try to set druid.broker.http.maxQueuedBytes so that back pressure can be exerted on the historical and the whole pipeline can move at the pace of the slowest link in the chain (you can take a look at https://support.imply.io/hc/en-us/articles/360034310953-Tuning-Druid-for-Large-Result-Sets for more params and details)

vijay

::DISCLAIMER::
----------------------------------------------------------------------------------------------------------------------------------------------------

The contents of this e-mail and any attachments are confidential and intended for the named recipient(s) only.E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted,lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents(with or without referred errors) shall therefore not attach any liability on the originator or redBus.com. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of redBus.com. Any form of reproduction, dissemination, copying, disclosure, modification,distribution and / or publication of this message without the prior written consent of authorized representative of redbus.com is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately.Before opening any email and/or attachments, please check them for viruses and other defects.

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/64e4cd1e-3165-49f2-9b8a-e6d0f7430a0en%40googlegroups.com.

Nitish P

unread,
Sep 6, 2021, 2:52:22 AM9/6/21
to Druid User
Hi Vijay,

Thanks for the document, we already have maxQueuedBytes set to 10Mb but we face this issue still.

In fact i listed out the different values below we have for each parameter as listed out in https://support.imply.io/hc/en-us/articles/360034310953-Tuning-Druid-for-Large-Result-Sets.

Interestingly after reading the document I did try out an experiment, I tried to increase druid.router.http.readTimeout in router from existing PT5M to PT15M, and surprisingly we were able to query upto 8min 40s instead of usual 5min after which the query failed with the same error but on the client side it closed without exception, it was able to query 81,36,866 rows vs 30,00,000 rows previous. 

This is a big improvement & this is why its confusing for me as i am really not sure how to debug this nor i am able to understand what is causing this issue. I am starting to believe this is related to timeouts, but since we have so many timeout parameters of each processes i wish to find the correlation between them and how i can debug this problem.

I have listed out all server configurations below. 

We are using currently druid 0.20.0 version.

################################################
Current values in our servers 
################################################################

druid.server.http.defaultQueryTimeout
-------------------------------------
Historical :  Not set so default is 300000ms i.e 5 minutes
Broker : Not set so default is 300000ms i.e 5 minutes
MiddleManager : Not set so default is 300000ms i.e 5 minutes

druid.broker.http.readTimeout
-------------------------------------
Broker-1 : PT5M
Broker-2 : PT5M

druid.router.http.readTimeout
-------------------------------------
Router : PT5M

druid.broker.http.maxQueuedBytes
-------------------------------------
Broker-1: 10MiB
Broker-2: 10MiB

druid.processing.buffer.sizeBytes
-------------------------------------
Historical-1 : 1024MiB
Historical-2 : 1024MiB
Broker-1 : 500MiB
Broker-2 :500MiB
MiddleManager-1 : druid.indexer.fork.property.druid.processing.buffer.sizeBytes=100MiB
Router : not set


druid.query.groupBy.maxMergingDictionarySize
-------------------------------------
Historical-1 : not set so default is 100000000 (100Mb)
Historical-2 :  not set so default is 100000000 (100Mb)
Broker-1 : not set so default is 100000000 (100Mb)
Broker-2 : not set so default is 100000000 
MiddleManager-1 : not set so default is 100000000 (100Mb)
Router : 

druid.query.groupBy.maxOnDiskStorage
-------------------------------------
Historical-1 : 1000000000 (100Mb)
Historical-2 : 1000000000 (100Mb)
Broker-1 : not set so default is 0 (disabled disk spilling)
Broker-2 : not set so default is 0 (disabled disk spilling)
MiddleManager-1 : not set so default is 0 (disabled disk spilling)
Router : 


#########################
Broker configuration
#########################
# HTTP server settings
druid.server.http.numThreads=50

# 15 minutes should be max timeout, we should not allow any queries 
# that holds up druid system 
druid.server.http.maxQueryTimeout=900000
druid.server.http.maxSubqueryRows=1000000
druid.broker.balancer.type=connectionCount
druid.broker.http.numConnections=40
druid.broker.http.maxQueuedBytes=10MiB
druid.broker.http.readTimeout=PT5M
druid.broker.http.unusedConnectionTimeout=PT4M
druid.broker.http.numMaxThreads=40
druid.processing.buffer.sizeBytes=500MiB
druid.processing.numMergeBuffers=6
druid.processing.numThreads=1
druid.processing.tmpDir=/opt/apache-druid-0.20.0/var/druid/processing

druid.monitoring.monitors=["org.apache.druid.java.util.metrics.SysMonitor","org.apache.druid.java.util.metrics.JvmMonitor","org.apache.druid.java.util.metrics.JvmThreadsMonitor","org.apache.druid.java.util.metrics.JvmCpuMonitor","org.apache.druid.java.util.metrics.CpuAcctDeltaMonitor","org.apache.druid.client.cache.CacheMonitor","org.apache.druid.server.metrics.QueryCountStatsMonitor","org.apache.druid.server.emitter.HttpEmittingMonitor"]

#########################
Router configuration
#########################

druid.router.http.numConnections=50
druid.router.http.readTimeout=PT15M
druid.router.http.numMaxThreads=40
druid.server.http.numThreads=50
druid.router.defaultBrokerServiceName=druid/broker
druid.router.coordinatorServiceName=druid/coordinator
druid.router.managementProxy.enabled=true
druid.monitoring.monitors=["org.apache.druid.java.util.metrics.SysMonitor","org.apache.druid.java.util.metrics.JvmCpuMonitor","org.apache.druid.java.util.metrics.CpuAcctDeltaMonitor","org.apache.druid.java.util.metrics.JvmMonitor","org.apache.druid.java.util.metrics.JvmThreadsMonitor","org.apache.druid.server.emitter.HttpEmittingMonitor"]

#########################
Historical configuration
#########################

druid.server.http.numThreads=50
druid.processing.buffer.sizeBytes=1024MiB
druid.processing.numMergeBuffers=2
druid.processing.numThreads=7
druid.processing.tmpDir=/opt/apache-druid-0.20.0/data/processing
druid.segmentCache.locations=[{"path":"/opt/apache-druid-0.20.0/data/segment-cache","maxSize":"540g"}]
druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true
druid.cache.type=caffeine
druid.cache.sizeInBytes=512MiB
druid.query.groupBy.maxOnDiskStorage=1000000000
druid.monitoring.monitors=["org.apache.druid.java.util.metrics.SysMonitor","org.apache.druid.java.util.metrics.JvmCpuMonitor","org.apache.druid.java.util.metrics.CpuAcctDeltaMonitor","org.apache.druid.java.util.metrics.JvmMonitor","org.apache.druid.java.util.metrics.JvmThreadsMonitor","org.apache.druid.client.cache.CacheMonitor","org.apache.druid.server.metrics.QueryCountStatsMonitor","org.apache.druid.server.metrics.HistoricalMetricsMonitor", "org.apache.druid.server.emitter.HttpEmittingMonitor"]

#########################
MiddleManager
#########################

druid.indexer.runner.startPort=8100
druid.indexer.runner.endPort=8140
druid.worker.capacity=8
druid.indexer.runner.javaOptsArray=["-server" ,"-Xms1200m" ,"-Xmx1200m" ,"-XX:MaxDirectMemorySize=1g","-Duser.timezone=UTC","-Dfile.encoding=UTF-8","-XX:+ExitOnOutOfMemoryError","-Djute.maxbuffer=1024000","-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager","-Dorg.jboss.logging.provider=slf4j","-Dnet.spy.log.LoggerImpl=net.spy.memcached.compat.log.SLF4JLogger","-Dlog4j.shutdownCallbackRegistry=org.apache.druid.common.config.Log4jShutdown","-Dlog4j.shutdownHookEnabled=true","-XX:+HeapDumpOnOutOfMemoryError","-XX:HeapDumpPath=/log/jvm/heapdump"]
druid.indexer.task.baseTaskDir=/opt/apache-druid-0.20.0/data/task
druid.server.http.numThreads=50
druid.indexer.fork.property.druid.processing.numMergeBuffers=2
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=100MiB
druid.indexer.fork.property.druid.processing.numThreads=1
druid.indexer.fork.property.druid.processing.tmpDir=/opt/apache-druid-0.20.0/data/tmp

#########################
Python code
#########################
import pandas as pd
from pydruid.db import connect
from datetime import datetime, timedelta
import pytz
st = datetime.now()
query = """select __time, MRIClientId, MRISessionId, EventType, RequestTimestamp, ResponseTimestamp, Phone, TIN, PNR from mriprodstream where __time BETWEEN TIMESTAMP '2021-08-24 00:00:00' and TIMESTAMP '2021-08-24 23:59:59'"""
try:
    conn = connect(host='mridruidquery', port=8888, path='/druid/v2/sql', scheme='http', context={"timeout" : 900000})
    curs = conn.cursor()
    print("Curs connection done!!")
    df = pd.DataFrame(curs.execute(query))
    print("data frame ready")
    print(df.shape)
except Exception as e:
    print(e)
finally:
    if curs:
        curs.close()
    if conn:
        conn.close()
print("Execution taken = {tt}".format(tt=(datetime.now()-st)))


Regards,
Nitish

vijay narayanan

unread,
Sep 6, 2021, 3:14:02 AM9/6/21
to druid...@googlegroups.com
where is druid running and where is python? There could be other http timeouts (if you are using load balancers for instance). It may be simpler if you just use multiple queries (each extracting 1 hour for instance)

vijay

Nitish P

unread,
Sep 6, 2021, 3:48:17 AM9/6/21
to Druid User
Hi Vijay,

We are running the python client on a 16core 32Gb machine (on premises India, Bangalore machine) & druid processes (AWS singapore) are running in separate machines where each druid process is deployed in its own server.

We are not using any load balancer, so the request is sent directly to mridruidquery which is the router process (since router is the loadbalancer here) and its routing to broker and to data servers.

In fact we were implementing this query extracting 1 day worth of data, but since the number of events ingested increased we were facing this issue, so we tried find out what is causing this (we will try the 1h extract solution). 

In fact our reason for trying this out is because we wanted to understand the root cause of such issue. We really wanted to understand what causes such issues and how to address them, because our event count keep increasing and who knows in future we may have 50 million rows in 1 hour itself (currently its in 1 day) so in such case if at all we execute 1h query and encounter such issues how can we go about addressing them.

Regards,
Nitish

vijay narayanan

unread,
Sep 6, 2021, 4:01:04 AM9/6/21
to druid...@googlegroups.com
Druid does not do very well when you have one large query extracting millions of rows. If you break a query that extracts 50 million rows into a 100 queries each extracting 500k rows and execute them in parallel you will see that the overall run time of the concurrent queries is significantly smaller than one large query (the constraint would be the bandwidth between your python and druid). So the approach would be to keep running queries on smaller time intervals as the data for each time interval grows. 

vijay

Nitish P

unread,
Sep 6, 2021, 4:11:25 AM9/6/21
to druid...@googlegroups.com
So can I conclude that there is a limitation in executing large queries on druid side & when such issues occur the solution is to break the queries to smaller queries to address them (executing sequentially/concurrently).

Regarding the overall timeouts applied on the system, is there a way to visualize it ?
for eg: router.readTimeout, broker.maxQueryTimeout, defaultQueryTimeout, how do they correlate with each other and how would increasing/decreasing one impact the other ?. 

Thank you again for your help.

You received this message because you are subscribed to a topic in the Google Groups "Druid User" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-user/TSo5yQcY888/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/CAPQU9_VkxDTr_qLZfCxpnQ7xpF7AW%3DmWXh%3DXKh0XdPWRxY23wA%40mail.gmail.com.

Nitish P

unread,
Sep 6, 2021, 5:08:01 AM9/6/21
to druid...@googlegroups.com
Also could you elaborate if possible on why this limitation occurs  :  "Druid does not do very well when you have one large query extracting millions of rows" ? I am still not clear on this as I am new to druid, thanks.

vijay narayanan

unread,
Sep 6, 2021, 7:44:10 AM9/6/21
to druid...@googlegroups.com
Running multiple parallel queries will get you faster results for the same resources. When a select query without order by executes the broker streams the results. When you execute as a single query you can make use of only one http thread on the broker to stream out the results. With multiple queries you can make use of multiple http threads on the broker. This will increase the throughput. For aggregation queries......the historicals use one merge thread per query typically. If you have multiple queries then you get multiple merge threads. So basically it boils down to parallelizing the work. Single large queries tend to create bottlenecks which will be avoided using multiple concurrent queries.

Nitish P

unread,
Sep 6, 2021, 8:46:40 AM9/6/21
to druid...@googlegroups.com
Thanks for the explanation of this and I agree for streaming scan queries but i wonder if we can apply the same for aggregation queries. I think for aggregation related queries parallel execution of queries may not be entirely possible (maybe i am wrong), usually aggregation related queries should generate smaller data as output but still i am wondering in case of large output how would one go about addressing such scenario.

Regarding the issue which is in this case, I am still not clear about the root cause. I understand that it's caused due to streaming large amounts of data but I am still lost on what causes the timeout and how we correlate the timeout.

Sorry for repeating like this and bothering you, but I feel this has been confusing for me since the beginning.

Nitish P

unread,
Sep 8, 2021, 4:44:16 AM9/8/21
to druid...@googlegroups.com
Any updates on this? 
Thanks
Reply all
Reply to author
Forward
0 new messages