Realtime Kafka Ingestion Slows to Crawl When Number of Realtime Segments is High (5,000+)

67 views
Skip to first unread message

Daniel Nash

unread,
May 2, 2025, 9:46:15 AMMay 2
to Druid User
I feel like our system is pretty well tuned for the near realtime data flow we usually have, which is data that might span the last day to the last week.  On average, for our highest volume datasource, which is handling a kafka topic with 8 partitions, we might have 30-100 hourly realtime segments we are writing to and one indexing task handling all the partitions, writing between 1,000-1,500 records/s.  I've seen that a single task can easily handle up to ~10,000 records/s without too much issue (we sometimes get bursty data coming in) so long as the number of realtime segments is relatively low (less than a few hundred).  For background, we maintain data for the last year for all our datasources.

We had a situation the other day where someone refed data for a single user in our system that spanned the entire year.  Because our segments are hourly, we ended up generating over 8,000+ realtime segments.  The performance of the kafka ingestion task dropped dramatically, causing lag to build up, and our autoscaling configuration caused us to create more indexing tasks to split up the kafka topic partitions.  We eventually hit our max of 8 ingestion tasks, with each task showing only ~100 records/s throughput.  Since I know from past experience that a task could easily handle ~10,000 records/s without issue, I could not figure out why the performance was so terrible.  Our kafka lag just continued to grow and grow due to the low performance of the indexing tasks.  My only theory is that the high number of realtime segments was causing some kind of performance impact, maybe due to the coordination with zookeeper or something else.

While having such a large number of realtime segments is not ideal, I also have to contend with the situation that our system will sometimes see data spanning large periods of time that must be dealt with.  I'm trying to figure out what it was about the above situation that was causing the performance degradation and what I can tweak to get around it.

Any thoughts appreciated.

~Dan

John Kowtko

unread,
May 2, 2025, 12:11:21 PMMay 2
to Druid User
Hi Dan,

Each real-time segment represents a different time interval that is actively being ingested into.   Each of these segment requires some infrastructure to be created within  the ingestion task, including buffers to hold initial data it is fetched.

The best practice I use is to limit the number of real-time segments per task to maybe a few dozen.  Above that it can start to bog down the ingestion task unnecessarily.

So, how to help reduce the number of real-time tasks in streaming ingestion?  I can think of a few strategies here:

 * send in your backfill data in chronological order, loading up only a few intervals of data at a time, and waiting for the ingestion tasks to rollover which will release older intervals that you have finished backfilling

 * Choose a larger segment granularity for the initial ingestion so there are fewer intervals overall to deal with

 * do your backfills using batch ingestion instead of streaming.

Let us know if any of those tactics are feasible for your situation.

Thanks.  John

Daniel Nash

unread,
May 5, 2025, 10:00:38 AMMay 5
to druid...@googlegroups.com
John, thank you very much for some great ideas.  I don't think we'd be able to implement your first suggestion just due to the amount of data we'd have to buffer, but I think the last two might be possibilities.

> Do your backfills using batch ingestion instead of streaming.

Would I be correct in assuming that a large number of hourly segments created during batch ingestion would not have the same performance impact as what I saw during realtime ingestion?

Our initial data flows through a few microservices for characterization and enrichment before ending up in Druid.  I think the easiest way to accomplish the refeeds as batch ingestion might be to insert a final microservice that would filter these older messages out of the Kafka topic feeding our realtime Druid ingestion task and write those out to disk to be batch ingested.  Our refeeds go through that whole processing flow, so it wouldn't be easy to distinguish the refeeds from the more realtime data other than by date.  This idea seems the most promising and I'll continue thinking on that.

> Choose a larger segment granularity for the initial ingestion so there are fewer intervals overall to deal with.

We are currently using concurrent append and replace to prevent conflicts with ingestion and compaction. While we ignore the past 7 days in our autocompaction configs, because our data is only quasi-realtime, we still see data outside that window sometimes and we were getting a lot of failed compaction tasks.  So, concurrent append and replace has been great for fixing that issue.  Under the "Known Limitations" section for the concurrent append and replace documentation, it explicitly discusses not mixing segment granularity and especially not compacting data into a more granular setting, which is what I think we'd probably want if we do the initial ingestion as DAY and compact to HOUR granularity.

Honestly, I'm not sure we actually need the HOUR granularity we currently have.  Our segment sizes are pretty much hitting the 5 million mark the documentation suggests at HOUR granularity, so we don't need a coarser granularity to hit that.  However, most of our time queries seem to be across a period of days, not really needing the HOUR granularity.  Need to talk to the user base more to see if we can get away with DAY granularity instead.

~Dan




--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/druid-user/9041af11-3e01-45a9-ac6e-c530ab567c61n%40googlegroups.com.

John Kowtko

unread,
May 5, 2025, 10:48:36 AMMay 5
to Druid User
Hi Dan,

Yeah, even if you divert the late arrival data upstream into a separate place you will eventually have to insert it into the main table, which will create a large number of segments based on the HOUR granularity.

You should not necessarily try to tune for one segment per time period.  If you tuned for 24 segments for DAY granularity, then you could take advantage of secondary range partitioning for query pruning, which can improve performance considerably.   That would be my preference.

Only if you have immense amounts of data generating a large number of segments for each hour, and you also have queries that are hour specific, or you have high QPS workloads that need range partitioning of your data right away, would I consider using HOUR granularity.

Thanks.  John

Daniel Nash

unread,
May 6, 2025, 10:57:33 AMMay 6
to druid...@googlegroups.com
Thanks for that general guidance on segment granularity, John.  I'm largely working with the decisions the previous administrators of this system made.  Your feedback has been helpful in questioning some of those decisions.  We only occasionally have more than 1-2 segments in a given hour interval.

I think I'm going to turn off the concurrent append and replace so that I can change the granularity of the realtime tasks to DAY and will manually compact the older segments to change them from HOUR to DAY.  Would it be worth using secondary range partitioning to try and order the segments into hourly chunks, or will that occur naturally during compaction from HOUR to DAY?

I guess the only nice thing about the HOUR granularity is that auto-compaction currently doesn't have to work with that much data.  If I switch to DAY granularity, the compaction task will be working with ~12-24 GB of data, depending on how voluminous the day's data was.  I suppose if multiple sub-tasks are used for the compaction task, that would distribute the load a bit, though then secondary range partitioning on the timestamp would probably be needed since the data would be distributed between a few workers.  Is that correct?

Related, but slightly different topic, my group is looking to add a datasource that would pretty much always have data distributed across a broad range of time.  So, unlike the case we've been discussing about an occasional refeed covering a large time period, data covering a large time period would pretty much be the norm.  I've been debating how best to tackle this scenario.  I can try to batch the data by specific time periods as you'd suggested previously to limit the number of segments I'm creating at any given time, but I'm pretty much going to need to be compacting across my entire dataset all the time to keep things optimal.  If you have any thoughts on dealing with data of this nature, I'd appreciate it.

~Dan


John Kowtko

unread,
May 7, 2025, 10:22:14 PMMay 7
to Druid User
Hi Dan,

you can change the granularity of real-time tasks to DAY right now ... this is a "target" granularity, so when data arrives for an existing interval the segment created will have granularity matching the existing segments in that interval.  it will only use the target granularity which data comes in for an empty interval.

Compaction on dozens of GB shouldn't take long.  Yes, use maxNumConcurrentSubtasks to parallelize -- if you have the cpus and task slots available.

I tried secondary partitioning using __time as the leading column, it basically generates numbered and not range partitioned segments.  If by "timestamp" you mean a secondary field with that name, then sure that potentially would help performance IF your query has a file on that field.

Last question on datasource with broad range of time -- if this is going to be populated using batch ingestion tasks, then if the table isn't too big I suggest using a larger segment granularity such as MONTH or YEAR, and then setting up auto-compaction with offset PT0S so it will compact the current time interval in between batch jobs.   If the table is big then you may have no choice but to keep the intervals smaller so the compaction jobs can run.  I wouldn't worry too much about adding fragmentation for many time periods ... Druid can handle pretty high QPS (in the hundred) while still opening hundreds of segments per query.  But if you need QPS in the 1000s then you should try to get everything range partitioned to minimize the number of segments being opened per query.

Hopefully this helps.  

-- John

Daniel Nash

unread,
May 9, 2025, 10:19:00 AMMay 9
to druid...@googlegroups.com
Thanks, John.  Yeah, I've been trying secondary partitioning on the __time column using single_dim partitioning, but it keeps generating a single segment with all the rows even though I've set targetNumberOfRowsPerSegment and maxNumberOfRowsPerSegment (only one at a time), e.g. I've been targeting 5,000,000 row segments, but the one generated segment is 16,000,000 rows.  Ideally, I'd like it to sort the day's data in order so that the number of segments is limited if the user is searching between particular hours of the day.  Doing compaction with dynamic partitioning will limit my segment sizes, but doesn't fully sort the data; I can find older and newer data in all the compacted segments.  Last attempt I tried:

{
  "type": "compact",
  "dataSource": "data-test",
  "ioConfig": {
    "type": "compact",
    "inputSpec": {
      "type": "interval",
      "interval": "2025-05-06T00:00:00.000Z/2025-05-07T00:00:00.000Z"
    },
    "dropExisting": false
  },
  "segmentGranularity": "DAY",
  "granularitySpec": {
    "segmentGranularity": "DAY",
    "queryGranularity": "MINUTE",
    "rollup": true
  },
  "tuningConfig": {
    "type": "index_parallel",
    "partitionsSpec": {
      "type": "single_dim",
      "partitionDimension": "_time",
      "targetRowsPerSegment": 5000000
    },
    "forceGuaranteedRollup": true,
    "maxNumConcurrentSubTasks": 3
  },
  "context": {
    "useConcurrentLocks": true
  }
}

~Dan


Message has been deleted

John Kowtko

unread,
May 10, 2025, 9:39:01 AMMay 10
to Druid User
Hi Dan, 

 * sharding keys I believe must be string fields ... so if you cast the time value to string as a secondary field you should be able to use it.  But again your queries will need to filter on it in order to take advantage of the segment pruning.

 * For segment size skew ... try maxRowsPerSegment instead of targetRowsPerSegment, to see if that properly limits the segments sizes.    However if you have data skew in the __time value (i.e. 16m rows all with the same __time value) you may not be able to get away with even segment sizes unless you use range partitioning (single-dim is essentially obsolete) and add more columns to the range key to increase it's cardinality for more even splits.

 * You can also try using MSQ SQL ingest statements (e.g. a REPLACE/SELECT statements) ... as MSQ automatically adds a "boost" column to the end of the range key to  up the cardinality of the key to allow for more even splits.

I will also go back to your use case and requirements here ... are the queries against DAY segment running noticable slow?  Or is your QPS rate high enough that the number of segments opened per query is limiting overall performance?

Thanks.  John
Reply all
Reply to author
Forward
0 new messages