Apache Druid - Late Arrival Classification Column

75 views
Skip to first unread message

Henrique Martins

unread,
Jun 4, 2025, 1:27:42 PMJun 4
to Druid User
Hello,

I am new at this group.
I am trying to find a solution for a new use case.
I cannot find it anywhere.

I know Druid can deal late arrivals but I would like to know if there is a way to create a Virtual Column with late arrival description.

If data is on time like their timestamp is equal to Druid ingestion or maximum 1 minute it's classified as real time.
If data is between 1 min and 30 min is late
If data is late longer than 30 min it's very late.

I saw that I cannot create many supervisors for the same data source. I could have them working using: lateMessageRejectionPeriod, earlyMessageRejectionPeriod

Is it possible to do on Druid? I am using Kafka Ingestion. I am studying the documentation it's been days. I am exhausted trying to find a solution.

Thanks in advance.
Br,
Henrique Martins.

John Kowtko

unread,
Jun 4, 2025, 1:45:47 PMJun 4
to Druid User
Hi Henrique,  

Check out the transformSpec ... if you have a "received date" field, then you can try writing an expression that subtracts the two dates, wrap it in a case statement to check for deltas, and output the desired value.

Let us know if something like that works for you.

Thanks.  John

Henrique Martins

unread,
Jun 5, 2025, 5:49:55 AMJun 5
to Druid User
Hello @John.
Thank you for answering my post. I don't know if I got what you said but the fact is:
I need to check current timestamp against Druid timestamp in ingestion time. It cannot work because it isn't idempotent, if you check the functions you can use during Kafka ingestion, you won't find now() or CURRENT_TIMESTAMP listed there because it cannot guarantee idempotency during ingestion (Guarantee that everything that kafka sends is the same druid receives) as discussed in this link:
https://github.com/apache/druid/issues/6513

I was trying to find a workaround. I thought about:
1. creating a supervisor for each kind of data.  In real time, almost real time and late.
   ISSUE: Each datasource can have only one Supervisor
2. I thought about creating a supervisor for in real time and then create tasks that ingest old data, but it seems you cannot ingest from Kafka out of the box

In a nutshell I am searching for a good approach. Now for lack of options I am going to used Kafka produce timestamp to do the job but it's totally wrong.
Kafka concept is to keep the message and guarantee it won't be lost in when you lose connection, that means when you lose connection with Druid and then you are able to send data it will you produce timestamp to check, so it will never be late since produce timestamp is simillar with register timestamp. The delay happened between Kafka and Druid. It's not a good approach.

Br,
Henrique Martins

John Kowtko

unread,
Jun 5, 2025, 6:28:06 AMJun 5
to Druid User
Hi Henrique,  yeah I know, that's what I said "if you have a received date" ... 

Are you able to use Kafka metadata, and the kafka.timestamp field?  (see screenshots)  It's not precisely CURRENT_TIMESTAMP, but it should be enough to allow you to determine lateness of data.

Thanks.  John
Screenshot 2025-06-05 at 3.23.39 AM.png
Screenshot 2025-06-05 at 3.22.44 AM.png

Henrique Martins

unread,
Jun 5, 2025, 7:53:19 AMJun 5
to Druid User
Hi John.,

I can access kafka.timestamp, but isn't this date generated after Kafka receive the stream from producer? What if kafka loses connection with consumer? isn't this kafka.timestamp going to be simillar to the producer timestamp instead of current consumer ingestion timestamp?
When is this date generated?

br,
Henrique Martins.

John Kowtko

unread,
Jun 5, 2025, 8:10:19 AMJun 5
to Druid User
Hi Henrique,

Generally we tune the Druid supervisor to keep Kafka lag at or near zero under steady-state operation, so Druid should be able to stay current on the Kafka stream so that data sent into the producer can be ingested by Druid sub-second.   Under that scenario even if kafka.timestamp is added on the producer side, it should give you what you need for "lateness" of the data timestamp.

In my experience Kafka-based ingestion into Druid is extremely stable, and can run indefinitely with zero or close to zero lag.   So we treat lag events as "incidents", not to be expected in normal operation unless your data feed into Kafka is very "bursty".

I think the best way to know is to try it out and sample the results, see if it matches your expectations.  

Thanks.  John

Henrique Martins

unread,
Jun 5, 2025, 8:35:04 AMJun 5
to Druid User
What if Druid loses connection with Kafka for 6 hours? Is this timestamp going to be updated with 6 hours latency for the time druid got connection back?

Henrique Martins

unread,
Jun 5, 2025, 8:36:08 AMJun 5
to Druid User
My need is to classify late data, so I am counting with incidents as well.

John Kowtko

unread,
Jun 5, 2025, 8:54:34 AMJun 5
to druid...@googlegroups.com
Okay that I don't have an answer to, other than the PR that you referenced.  

You could also try to read the Kafka emitted metric and ingest that into a side table that could be joined to the main table to help you incorporate lag information into your lateness calculation, but that is clearly not a clean solution to what you are trying to achieve.

Thanks.  John

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/druid-user/58a8c61e-969b-46ed-b4e5-e24b4be36bf5n%40googlegroups.com.


--
John Kowtko
Senior Customer Architect

Henrique Martins

unread,
Jun 17, 2025, 12:36:30 PMJun 17
to druid...@googlegroups.com
Hi John,

Thank you for try though. We are going to use kafka.timestamp for now.

Br,
Henrique Martins.

Reply all
Reply to author
Forward
0 new messages