Row-Level updates needed

107 views
Skip to first unread message

Kaustubh

unread,
Nov 16, 2023, 3:23:01 AM11/16/23
to Druid User
Hi All,

I m planning to add Fault events to Druid. The event has an ID, Status and StaticContext. 
The events are triggered for different statuses with only the status field which changes. The big staticContext payload remains unchanged. If i store all the events that would take up more space for redundant data. Ideally row level update for "status" field would have done the job.

Payload:
{
"timestamp": 1699439818847,
"id": "Device_1",
"status": "OPEN",
"staticContext": {
//details about the fault
//remains unchanged
}
}

So trying to figure out the best way to handle this scenario such that - 
* minimum space is needed to store the data (millions of events triggered daily)
* queries are fast

Please let me know of any possible solutions i can use here. Thanks



Message has been deleted
Message has been deleted
Message has been deleted

John Kowtko

unread,
Nov 16, 2023, 9:17:18 AM11/16/23
to Druid User
Hi Kaus,

Druid can update date in place only in a batch manner ... but it can be done with some level of efficiency.

I wrote a small blog on this earlier ... please read through it (content below) and let me know if you have any questions.  I also have a short video that I could send you if you are interested in seeing more.

Thanks.  John


Screenshot 2023-11-16 at 6.15.08 AM.png 
Screenshot 2023-11-16 at 6.15.50 AM.png 
Screenshot 2023-11-16 at 6.16.24 AM.png 
Screenshot 2023-11-16 at 6.16.51 AM.png

Message has been deleted
Message has been deleted

Kaustubh

unread,
Nov 20, 2023, 10:04:35 AM11/20/23
to Druid User
@john I tried the solution mentioned and it worked

It will be helpful if you could help here
1. would using this solution have any adverse effects if millions of updates are needed everyday (memory, cpu, performance)
2. tried the same approach with streaming data and as you mentioned it didnt work...any leads on why this doesnt work for streaming data 

Thanks

John Kowtko

unread,
Nov 20, 2023, 2:04:58 PM11/20/23
to Druid User
Hi Kaus,

Q1. 

Each update job will create new segments, that overlay on top of the old segments.  If the new segments completely cover any older segment, then that older segment is considered "fully overshadowed" and will be automatically dropped from the cluster.   

Otherwise if any portion of the old segment is still not overshadowed, then that segment must remain active to serve up its unshadowed portion of the data.   So over time segments can pile up and segment count increase, which can start to create a burden on queries and on cluster maintenance activities.

This is where compaction comes in ... compaction will merge the layered sets of segments together into a single set of segments, so you are back to the original number of segments.  So as long as you have compaction running then you should be okay here.

Now as the segments are marked as unused, then they will still hang around in the system until they are permanently removed.  This is where Kill tasks come in ... they remove unused segments from the system.  This prevents record buildup in the metadata DB as well as deep storage

So just make sure you have compaction and kill tasks going and there should be no issues regardless of the volume.

Q2.

Streaming data is append only ... it cannot be used for updates.  When people want to receive updates via streaming data there are a couple other options:

 a) ingest the data as new record values, then at query time use the Latest() function to pull only the latest values for that event.  This leaves all versions of the data in the cluster, which you could view as pro or con.   You could eventually remove the old records through reindex batch jobs, even using the batch update method in some cases.

b) if the updates are ingested in a separate stream, ingest them into a side table, then issue a reindex batch job that merges the update data back into the main table using some form of join.


===

Let me know if you have any additional questions on this.

Thanks.  John

Kaustubh

unread,
Nov 26, 2023, 2:16:09 PM11/26/23
to Druid User
Hi John,

Based on suggestion "a", i am running a stream ingestion into "Bucket-Combined" having both the events (Open and Closed). Plan is to reindex Bucket-Combined to "Bucket-Closed-Only" copying all the Closed tasks from Bucket-Combined to Bucket-Closed-Only. And then a separate reindex task to delete rows having status 'Closed' from Bucket-Combined. Thus helping me keep track of open and closed tasks in different Datasources

Faced some issues regarding locking with the approach

Q1. Can stream ingestion be configured to use "Range" partitioning instead of the default dynamic partioning (Ingestion load being not a concern). Druid UI does not show that option, but was able to run it by manually editing the ingestion spec
Q2. Is there a config in ingestion spec to force persist segments older than suppose 1 hour...suspecting lock issues are caused because a segment is still being appended by MM, when a reindex tasks trying to edit it
Q3. Any config to apply to reindex tasks , to reindex only the segments present in Historical and not the middlemanager segments to avoid locking issues
Q4. What are the trigger points when Druid MM decides to persist a segment...i know of maxRowPerSegment and maxRowsInMemory....any other trigger points

Thanks

John Kowtko

unread,
Dec 6, 2023, 7:27:14 PM12/6/23
to Druid User
Q1:  Unforunately not, for two architectural reasons:  
         (1) multiple ingestion tasks can be running and feeding into the same datasources, they have no idea what each other's data is so unable to come up with range splits with which to partition the data.  if they each came up with their own ranges, then you would have multiple ranges sets, which could become unwieldy
         (2) streaming ingestion only sees data as it is coming in, it has no idea of what data is coming next ... so similar to the multiple-task issue, even an individual task has no idea how it will need to range split the data
         (3) there is probably also a third reason in that reparitioning data is cpu-intensive and would slow down the overall ingestion process, something we don't want to do for potentially minimal benefit (due to the two limitations noted above).

Q2: Yes, taskDuration and intermediateHandoffPeriod.    taskDuration forces a cycling of the JVMs, intermediateHandoffPeriod just forces segments to be built at that interval.

Q3: Yes actually -- there is a new "Concurrent Append and Replace" feature that has just come out that will allow you to do this: https://druid.apache.org/docs/latest/data-management/automatic-compaction#concurrent-append-and-replace

Q4: There are two levels of persist.  If by "persist" you mean publishing a segment to deep storage so it transitions from real-time to historical segment status, those parmeters would be taskDuration, intermediateHandoffPeriod, maxRowsPerSegment, and maxTotalRows.

Let us know if you have followup questions on any of this.

Thanks.  John
Reply all
Reply to author
Forward
0 new messages