Data Retrieval Across Multiple Partitions 2

31 views
Skip to first unread message

jacob franco

unread,
Oct 7, 2024, 6:02:34 PM10/7/24
to rama-user
Hey, I have a question that is similar to a question I previously asked here : https://groups.google.com/g/rama-user/c/sCVcIAFrVRo

except for when I tried it for this case, I'm not getting the results I expect.

So basically, we have a system where we ingest Series objects, and we assign them to two different PStates - one by id and one by start time. 
screenshot.png
And here are the corresponding queries
241007_17h48m47s_screenshot.png
I've tried a few approaches, and the screenshots provided just shows the state I'm at where we aren't really doing any hashing at all, for both readability purposes and because it's the last thing I tried.  However, everything I've tried is yielding different sets of Series objects being returned, with varying degrees of overlap, but none yielding the full complete set of data. 

My initial approach was that I assumed that in my PState declaration, I needed to add .hashPartition("*start") in between the local transforms, and then in the query do the same thing before the localSelect so we're operating at the same point.  However, this did not work and I'm not sure why. 

It's also entirely possible that my mistake lies outside of Rama, but I'm not so sure about that.  Any insight is appreciated!

Nathan Marz

unread,
Oct 7, 2024, 6:42:30 PM10/7/24
to rama...@googlegroups.com
Is the depot partitioned by the series ID? Presuming that's the case, then $$seriesIdToSeries will be partitioned correctly, and each Series object will be indexed in the colocated partition of $$startTimeToSeries. 

Since getSeriesSchedule has no leading partitioner, it's selecting from a random partition. This is why it's giving random results. Since the Series are spread across all partitions of $$startTimeToSeries, your query topology should do an allPartition, do the localSelect on each partition, and then aggregate the results. 

Finally, the getSeriesFromSeriesId query topology looks correct, but it's unnecessary. That query topology is just the foreign path Path.key("*seriesId"). You conditional after the select doesn't look like it does anything.

--
You received this message because you are subscribed to the Google Groups "rama-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rama-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/rama-user/14e6a008-4364-4ddb-a408-dc9a51768e1dn%40googlegroups.com.

jacob franco

unread,
Oct 7, 2024, 7:54:56 PM10/7/24
to rama-user
1) Yes, the depot is partitioned by series ID
2) I fixed the conditional, thank you

I have some more questions though:
1) Why are there multiple partitions of $$startTimeToSeries?
2) Why can't I just do this? 241007_19h18m02s_screenshot.png
241007_19h19m53s_screenshot.png
3) So when I call .allPartition() and then my local select then something like compoundAgg on it, that's just going to loop for all Partitions? Am I understanding that correctly?
4) I guess I'm still not understanding conceptually how partitions are created.  Like I have the series depot declared and we hash by series id.  And so the PStates also have partitions within them, and this is related to my first question, but I don't get how $$seriesIdToSeries is partitioned correctly just hashing by id, but then $$startTimeToSeries isn't partitioned correct, hashing by start, is it because the depot is hashed by id already?
5) The docs note that allPartition should be used carefully.  Given that, is there a better way to try and accomplish what I'm attempting?

I really appreciate the help!

Nathan Marz

unread,
Oct 7, 2024, 9:24:58 PM10/7/24
to rama...@googlegroups.com
I think you may be misunderstanding how PStates work. A module runs across many "tasks", and each task has one partition of every depot and PState declared for the module (unless it's a global depot/PState, in which case it only has a partition on task 0). When you do a localSelect on a PState, you're querying the partition of that PState that's on the task you're currently running on.

When you do "hashPartition", you're relocating the dataflow code to a task based on the hash of the argument modulo the number of tasks. This means the same arg always hash partitions to the same task, but different args are evenly distributed across all tasks due to the properties of hash functions.

So if you do something like .hashPartition("*start"), you're relocating the computation based on the value of "*start", which is basically going to spread out those values across all partitions in a pretty arbitrary way. Since your query wants to get all objects in between two times, either everything needs to be in the same PState partition (a global PState), or you need to query and aggregate across all partitions.

To answer your other questions more specifically:

3. allPartition will run the subsequent code on all tasks in parallel
5. Yes, allPartition should be used carefully. A query that reads across all partitions on every invocation has a scaling limit. Part of the design of modules is determining how you should be partitioning in indexing data to best support your queries.

Hope that helps.

jacob franco

unread,
Oct 7, 2024, 9:55:32 PM10/7/24
to rama-user
Yeah, definitely misunderstanding, but your explanation does help.  For my use case I may opt for a global pstate, but I'll have to see.  I'll for sure study the docs some more though.  Thanks again. 
Reply all
Reply to author
Forward
0 new messages