Kafka data reindexing - delayed data

71 views
Skip to first unread message

AR

unread,
Sep 18, 2023, 12:50:06 PM9/18/23
to Druid User
Hi,
We have a few real time indexer jobs processing data from Kafka. As the data from Kafka is not partitioned on the dimension that is used for querying we are planning to add a process to reindex the data in the Druid table periodically and repartition on the partition column. The reindexing job will process segments older than 3 hours and write to the same table as the real time indexer.

We have noticed  a few instances when the data on Kafka is delayed - the timestamp on the record is a few hours old. These would need to be written as new segments to time chunks that may have already been reindexed by the reindexer process. Will there be any issue in such cases?

I am unable to find clear documentation on what would happen in such cases even though repartitioning streaming data is suggested as one of the usecases for reindexing.

Regards,
AR

John Kowtko

unread,
Sep 18, 2023, 1:50:44 PM9/18/23
to Druid User
Hi AR,

Data appended (e.g. appendToExisting:true) to an existing timechunk that has already been range or hash partitioned, should simply add more dynamically partitioned segments to that time chunk.  The original range/hash segments together are called (I think) the "core partition" ... these act as a set as they all must be available in order for the time chunk to be considered available.   

The additional segments may or may not be available, it is not as important because it is recognized that data is being ingested constantly.

After you ingest new (dynamic) segments into an already-compacted time chunk, you can then re-compact (or re-index) that time chunk to consolidate the segments together, and if range or hash partitioned, you would be creating a new "core partition" that will supersede the old time chunk segments completely.

Auto-compaction will do all of this automatically.

If you have any questions on this let us know.

Thanks.  John

Sergio Ferragut

unread,
Sep 18, 2023, 1:51:01 PM9/18/23
to druid...@googlegroups.com
Yes you can. New segments are for the compacted interval are appended. Since they are dynamically partitioned they will be read whenever the timeframe of the segment is involved in a query. Further compaction will incorporate the new segment into your partitioning strategy. 

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/b87df937-352c-4166-8d3b-9de8e0e10b61n%40googlegroups.com.

AR

unread,
Sep 19, 2023, 12:39:57 AM9/19/23
to Druid User
Thank you John, Sergio.
I guess the reindexer job will have to omit intervals which consist of a realtime segment when the job runs. I am planning on using the segment metadata query to identify such segments and exclude them from reindexing. Or is this unnecessary?
If, by chance, the reindexer job tries to index a time interval which contains a realtime segment (edge case), will the reindexing task fail?

Thanks,
AR.

AR

unread,
Sep 19, 2023, 11:47:40 AM9/19/23
to Druid User
Couple more questions:
What is the best way to identify if a new non-partitioned segment has been added to a time chunk? I was thinking of querying the "sys.segments" table to identify if any of the segments are of type "numbered" in the "shardSpec". But when I try to parse the "shardSpec" using the "parse_json" function, it throws a Runtime exception.

What is the best way to identify if a timechunk needs to be compacted? We cannot enable auto-compaction due to resource limitations and so plan to setup a manual compaction job to do this. I was thinking of using the "shardSpec" again to determine this.

Regards,
AR.

Sergio Ferragut

unread,
Sep 19, 2023, 12:07:54 PM9/19/23
to druid...@googlegroups.com
The real-time segment job takes precedence, the reindex will get cancelled after failing to obtain the lock.
If you use auto-compaction for this, it has a parameter to avoid this called "skipOffsetFromLatest" it where you specify the period to skip. Usually set to 2x the segment granularity of the streaming ingestion.

Sergio

Druid User

unread,
Sep 19, 2023, 12:30:22 PM9/19/23
to Druid User
Thanks Sergio. 
What happens if the realtime task receives a message while the reindexing job for that timechunk is running? Will the realtime task fail in this case?
We are setting an offset of 3 hours (segment granularity is "hour") but the data is sometimes delayed by more than 3 hours so we need a way to identify the segments that need to be reindexed. 

I tried the below query from the console but it fails. 
Query: select PARSE_JSON(shard_spec) from sys.segments limit 5
Error: Unknown exception
cannot translate call PARSE_JSON($t14)
java.lang.RuntimeException

It works on literal JSON strings.
Query: select PARSE_JSON('{"k1":"v1"}')
Works and returns a Complex JSON object

Doesn't "PARSE_JSON" work on VARCHAR columns in the table?

Regards,
AR.

John Kowtko

unread,
Sep 20, 2023, 2:52:54 PM9/20/23
to Druid User
PARSE_JSON() is a Druid-specific SQL function ... the SYS (which is a virtual in-memory) schema uses a different, "standard" SQL engine for querying, so only simple SQL string functions will work here.  

AR

unread,
Sep 21, 2023, 3:45:41 AM9/21/23
to Druid User
Hi John,
I understand now that the Druid specific functions cannot be applied to SYS schema. You clarified that in another post of mine. Thank you. :)

Can you please confirm what happens if the realtime indexer receives a record which corresponds to a time chunk which is being reindexed by the reindexer job? Will it continue to work fine since the data will continue to exist only in the indexer memory or will the realtime indexer task fail when it tries to write the segment to the time chunk?

Regards,
AR.

John Kowtko

unread,
Sep 21, 2023, 6:20:28 PM9/21/23
to Druid User
It should revoke the lock on the reindexer job, and the reindexer job will abort.   There is a Priority property somewhere ... ingestion tasks are highest priority around 75, batch ingestion around 50, compaction around 25 ... something like that.
Reply all
Reply to author
Forward
0 new messages