Kafka injection jobs is failing!

179 views
Skip to first unread message

Laxmikant Pandhare

unread,
Sep 22, 2023, 6:20:04 PM9/22/23
to Druid User
Hi,

I am trying to load very huge data and I am facing below error.

No task in the corresponding pending completion taskGroup[0] succeeded before completion timeout ela...

I increased completionTimeout to 3600S and taskDuration parameter is also 3600S.
Still my job is failing.

Anyone has any idea about the error.

John Kowtko

unread,
Sep 23, 2023, 11:43:49 AM9/23/23
to Druid User
Hi Laxmikan,

If you have tight retention rules and lax early/late message rejection thresholds, this could be due to trying to publish a segment that is outside of the retention window, and the task waits indefinitely for the handoff to complete.  

If you think this is the case, or if you otherwise would prefer the ask not get killed,  then you can change handoffConditionTimeout to a value less than completionTimeout so the task will not wait long enough to be killed by the Supervisor.

Let us know if that works for you.  If not, then please provide log info from the task that is getting killed.

Thanks.  John

Sergio Ferragut

unread,
Sep 26, 2023, 11:47:46 AM9/26/23
to druid...@googlegroups.com
How much data are you trying to load? 
What is the source before Kafka? 
If this is an initial load of history, I highly recommend SQL based ingestion for that and then streaming ingestion for new data.



--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/f53ad94a-b466-48b3-a9ed-15bb7e4330d0n%40googlegroups.com.

Laxmikant Pandhare

unread,
Sep 29, 2023, 5:45:07 PM9/29/23
to Druid User
I will check and update on this soon.

Laxmikant Pandhare

unread,
Oct 5, 2023, 4:20:23 PM10/5/23
to Druid User
HI John & Sergio - this issue still persists.

handoffConditionTimeout is Zero, completionTimeout is PT4500S, and taskDuration is PT3600S.

Data load is Huge. As of now, total data size is 1 TB Total no of rows are 45811746352.

We are directly reading raw data from kafka, process them using spark and write it back to another kafka topic. Druid injections jobs are reading from those topics.

John Kowtko

unread,
Oct 5, 2023, 10:48:02 PM10/5/23
to Druid User
Hi Laxmikan,  I still don't have an idea of what could be an issue here.  Can you share your ingestion spec, and also tell us how many segments are being created per hour?

Thanks.  John

Sergio Ferragut

unread,
Oct 9, 2023, 12:34:50 PM10/9/23
to druid...@googlegroups.com
Are you trying to do a load of history or is this the ongoing volume you would expect?
If it is an initial load, batch loading would be much more efficient and it can produce segments that are organized for query using secondary partitioning (clustering).
If it is an ongoing load, what is the expected throughput? How many tasks are you running now? What segment granularity are you using? Is there late arriving data or is it all very recent?
Streaming ingestion tasks align consumed rows to logical segments based on segment granularity. If the timestamps have a broad range, you will get a lot of small segments as it reaches the in memory limits. Managing a large number of small segments tends to slow things down and consumes more resources.
Handoff process requires
-  the overlord recognizes a task published a segment and writes it to the metadata DB.
- that the coordinator read all the new segments from the metadata and assign them to historicals
- the historicals upload them and announce the presence of the segment in its cache through either HTTP or Zookeeper based on (druid.serverview.type)
- the task gets the handoff signal from for the segment it published via http or zookeeper.

So if it is timing out waiting for this. Something along this path is stuck or does not have enough resources. 

Hope this helps,

Sergio

Laxmikant Pandhare

unread,
Oct 10, 2023, 12:23:32 PM10/10/23
to Druid User
The Kafka topic contains data for 4 days. The jobs got failed and I reset it to load 4 days of data. 

Please find below spec details. Segment Granularity is DAY. Data is very recent and there is no lag in data.

"ioConfig": {
  "topic": "xyz",
  "inputFormat": {
    "type": "json",
    "keepNullColumns": false,
    "assumeNewlineDelimited": false,
    "useJsonNodeReader": false
  },
  "replicas": 2,
  "taskCount": 2,
  "taskDuration": "PT3600S",
  "consumerProperties": {
    "security.protocol": "SASL_PLAINTEXT",
    "bootstrap.servers": "abc:1111,pqr:1111,lmn:1111",
    "lmk": "1111\",ghi:1111\",ghk:1111\""
  },
  "autoScalerConfig": null,
  "pollTimeout": 100,
  "startDelay": "PT5S",
  "period": "PT30S",
  "useEarliestOffset": true,
  "completionTimeout": "PT4500S",
  "lateMessageRejectionPeriod": null,
  "earlyMessageRejectionPeriod": null,
  "lateMessageRejectionStartDateTime": null,
  "configOverrides": null,
  "idleConfig": null,
  "stream": "xyz",
  "useEarliestSequenceNumber": true
},

Please find image for more details.

Screenshot 2023-10-05 at 7.55.49 PM.png

John Kowtko

unread,
Oct 11, 2023, 1:30:33 AM10/11/23
to Druid User
A few more questions:
1. Can you share your auto-compaction spec?
2. How many segments per day before they are compacted?
3. Does the ingestion spec include rollup=true?
3a. If so, then can you estimate the rollup reduction, e.g. "select count(*), sum("count") from datasource"?

Thanks.  John

Sergio Ferragut

unread,
Oct 11, 2023, 12:59:36 PM10/11/23
to druid...@googlegroups.com
It would be really useful to see one of the failed task logs. Can you share?
The error you started with is the overlord timing out the task, perhaps it just needs more time given the size of segments it is building.
What is the maxRowsPerSegment on the ingestion spec? I see some really large segments (17 million ) and some really small ones ( 1 ). The small ones may be a problem if there are too many of them.


Laxmikant Pandhare

unread,
Oct 12, 2023, 2:43:03 AM10/12/23
to Druid User
Hello, I updated below field and job failing stopped. Now, everything is running fine.

druid.indexer.runner.taskAssignmentTimeout

I changed this field from PT5M to PT15M. Will this impact the overall performance?


Sergio Ferragut

unread,
Oct 12, 2023, 12:59:08 PM10/12/23
to druid...@googlegroups.com
Interesting, that setting is giving the task execution system more leeway in terms of waiting for the task to start.
Probably indicative of a bottleneck somewhere or not enough resources. Thinking some MMs or the Overlord might be overworked or starved for CPU. 

Laxmikant Pandhare

unread,
Oct 17, 2023, 12:41:25 AM10/17/23
to Druid User
it worked for one day and again started failing with same error.

Ben Krug

unread,
Oct 17, 2023, 1:05:52 AM10/17/23
to druid...@googlegroups.com
I agree with Sergio, it would be helpful to see a task log, and also the overlord log.  Can you zip and attach those?  Or did you already and I missed them?

I hesitate to speculate without those, but some other possibilities include waiting for locks for a time chunk.  It does seem odd that you have at least one
segment with 1 row, but it says it's fully compacted.  Do you have autocompaction enabled?  If so, check out the options for configuring it to not conflict
with ingestion here.  Or, consider disabling it for now as a test.

But really, a task log and the overlord log would be very helpful.

Laxmikant Pandhare

unread,
Oct 19, 2023, 11:55:57 PM10/19/23
to Druid User
Hi All - 

It looks like after reduction of maxRowsPerSegment - the above error is not coming as of now. So, one question here is which trade-off needs to have highest preference over other.

I mean to say, there are two things we have to consider.

1- maxRowsPerSegment which is maximum 5 million by default
2 - Segment size limit 300 - 700 MB.

In my case, if I increase maxRowsPerSegment to 20 million then it is creating segment size of 300-700 MB and if, I reduce it to 5 million then it is creating 100-200 MB.

So, what do you suggest out of above two scenarios.

Thank You,
Laxmikant Pandhare

John Kowtko

unread,
Oct 20, 2023, 7:45:45 AM10/20/23
to druid...@googlegroups.com
I would always opt for fewer, larger segments if possible:

Pros:
 * fewer segments = fewer copies of column dictionaries = less overall space usage for the datasource
 * fewer segments when highly fragmented can improve query performance, and in general avoid issues related to cluster/segment maintenance

Cons:
 * Too few segments may reduce the ability to parallelize queries
 * Too large segments could cause query performance issue (e.g. segment scan times) and/or task issues (as you have noted)

I'm sure there are other things to consider as well, these are just a few I can think of right now.

Thanks.  John



--
John Kowtko
Technical Success Manager

Laxmikant Pandhare

unread,
Oct 23, 2023, 4:34:27 PM10/23/23
to Druid User
That sounds good. Need to manage both fewer segment and size between 300-700 MB.

As of now, jobs is working fine.

Thank You,
Laxmikant

Nathan Li

unread,
Nov 9, 2023, 4:28:07 PM11/9/23
to Druid User
i was also able to resolve my failing kafka ingestion tasks using the following changes based on the recommendations in this thread:

-> increase the instance's mem & cpu of the master/query node (i happen to have these 2 on the same instance).
-> increase  maxRowsPerSegment in the kafka streaming spec (to 20mil as well)
-> increase druid.indexer.runner.taskAssignmentTimeout from 5 to 15 (not sure if this had any affect bc the docs suggest that this config is only applicable for overlord remote mode)
-> MM -> some runtime.properties config changes for the peons
-> MM -> increased the maxdirectmemorysize for the javaoptarray

src data ingestion total size: 2tb~
 
still new to druid but hopefully could help someone else as well. 

Laxmikant Pandhare

unread,
Jan 11, 2024, 10:46:49 PM1/11/24
to Druid User
HI All

All injections jobs are working fine. But few larger datasources creating smaller segments like 150-200 MB even for 10 million rows per segments.

Can I increase number of rows per segment number to 20 million and make sure that segment size will be between 300-700 MB. Will this cause any Query performance issues in Druid ?

John Kowtko

unread,
Jan 12, 2024, 9:34:21 AM1/12/24
to Druid User
Hi Laxmikant,

In general you can keep increasing segment size as long as your segment build/publish times don't get so long that they start interferring with normal supervisor operation, and that the resulting historical segments still have reasonable scan times.

 * Increasing segment size also increases the number of intermediate persist files (mini-segments) that will be created prior to building/publishing a segment, if that number grows into the hundreds there could be some overhead there.  Number of persists could be reduced by increasing maxRowsInMemory, however if you do that you can slow down real-time (peon) segment scan times.   You have to find the balance of all of these.

 * You need to make sure your taskDuration is large enough to allow the task to build one or more full size segments.  Since the last segment of a task during it's lifecycle is usually a partial one, if you use a longer taskDuration that will generally allow the task to create more full-sized segments in relation to the partial one it always creates at the end.

 * If you are receiving late arrival data, then increasing max segment size will also help reduce fragmentation for the "secondary" segments (those that are holding the late arrival time interval data) that also are force-created whenever the primary segment hits it's size limit and gets created.  

I have seen clusters with 50m row segments but still under 1GB in size (narrow rows) ... and I have seen clusters with 1-2GB segment size but with smaller number of rows (wider rows) ... there are no "hard limits" here that will break something, it's just a matter of how the resources are being used and loaded, and pushing to the point where they start to interfere with each other's operations.  

Every workload has different characteristics so no simple "cookbook" here to help you, mostly just gaining a good understanding of how everything works so you can tune best for your workload.

So yes, I suggest try with 20m rows.  Then try 30m, and 40m, if you want ... just note the segment build times and the resulting historical segment scan times to see where the sweet spot for your workload is :) 

Thanks.  John

Laxmikant Pandhare

unread,
Jan 13, 2024, 1:11:29 AM1/13/24
to Druid User
Thank you, John for the detailed explanation.
Reply all
Reply to author
Forward
0 new messages