PStates => PStates and Depots => Depots

191 views
Skip to first unread message

Fjolne Yngling

unread,
Dec 5, 2023, 2:51:35 PM12/5/23
to rama-user
Hi,

I'm evaluating Rama for our company use cases (finance) and got a few related questions while making a small prototype (in Clojure):

1. What is the idiomatic way to create a PState which sources another PState? I'm trying to represent an orderbook as a PState and have another PState to store timeseries features (e.g. {DateTime Float}), which are calculated from the orderbook state every D minutes (let's assume D=5). This ETL should be able to reconstruct feature values from the past during the depots' history replay, so it can't depend on the wall clock as a trigger. I see 2 approaches:
a) subscribe to `updated-at` timestamp of the orderbook PState and trigger computation when t_prev < floor(t_cur, "5min") <= t_cur. Seems like this would require reactive capabilities of a query topology (e.g. diffs) and I'm not sure if it's possible to write to PStates from query topologies?
b) do a regular transform like `(local-transform> [(keypath (t/floor *orderbook-updated-at "5min")) (termval (calc-some-feature *orderbook))] $$some-feature)`, which would work if orderbook states were coming from a depot every 5min, but I'm not sure if I can subscribe to a PState? Or maybe I could write PState snapshots to a new depot periodically?

2. Let there be N almost identical streams of external events. We'd like to merge them into a single stream, deduplicating events with the same IDs. All the other entities in our system should only know about the final stream:
a) Do I understand correctly that partitioning is exclusively an optimization strategy and partitions should not be assigned semantic meaning: namely that we should create separate depots for each pre-merge stream of data, instead of using separate partitions of a single depot?
b) Could we write the final stream into a new dedicated depot, for other ETLs to source it, or is it required to abstract at the level of Dataflow functions/macros?

Thank you for your time.

Nathan Marz

unread,
Dec 5, 2023, 9:48:19 PM12/5/23
to rama...@googlegroups.com
Answers inline.

On Tue, Dec 5, 2023 at 9:51 AM Fjolne Yngling <fjolne....@gmail.com> wrote:
Hi,

I'm evaluating Rama for our company use cases (finance) and got a few related questions while making a small prototype (in Clojure):

1. What is the idiomatic way to create a PState which sources another PState? I'm trying to represent an orderbook as a PState and have another PState to store timeseries features (e.g. {DateTime Float}), which are calculated from the orderbook state every D minutes (let's assume D=5). This ETL should be able to reconstruct feature values from the past during the depots' history replay, so it can't depend on the wall clock as a trigger. I see 2 approaches:
a) subscribe to `updated-at` timestamp of the orderbook PState and trigger computation when t_prev < floor(t_cur, "5min") <= t_cur. Seems like this would require reactive capabilities of a query topology (e.g. diffs) and I'm not sure if it's possible to write to PStates from query topologies?
b) do a regular transform like `(local-transform> [(keypath (t/floor *orderbook-updated-at "5min")) (termval (calc-some-feature *orderbook))] $$some-feature)`, which would work if orderbook states were coming from a depot every 5min, but I'm not sure if I can subscribe to a PState? Or maybe I could write PState snapshots to a new depot periodically?

Without knowing the specifics of your use case, there are a variety of approaches. Topologies can either source from regular depots (data), tick depots (time), or a combination. Some ways you can make use of these primitives are:
  • Update the timeseries PState in the same ETL that updates the orderbook PState. 
  • Have the branch of the ETL that updates the orderbook also write to a special PState storing a set of entities that should be updated in the timeseries PState in the future. Use a tick depot to trigger consumption of this special PState to update the timeseries PState.
I suspect the second approach is what you're looking for. The "who to follow" feature of our Mastodon implementation uses that pattern for part of how it works:
  • Whenever a user passes the 10 or 100 follows threshold, that user ID is written to a special $$forceRecomputeUsers PState.
  • Every 30s a tick depot consumed by the same topology triggers that $$forceRecomputeUsers PState to be consumed, and the "who to follow" recommendations for those users to be recomputed from scratch.


2. Let there be N almost identical streams of external events. We'd like to merge them into a single stream, deduplicating events with the same IDs. All the other entities in our system should only know about the final stream:
a) Do I understand correctly that partitioning is exclusively an optimization strategy and partitions should not be assigned semantic meaning: namely that we should create separate depots for each pre-merge stream of data, instead of using separate partitions of a single depot?

Records that need to be processed in order need to be on the same partition of the same depot. That's the primary factor to consider with depot design. Using Mastodon as an example again, we keep "Follow" and "Unfollow" events on the same depot. Additionally, we set the depot partitioner to partition by the requester's user ID to ensure requests from the same user go to the same partition and are processed in order. 

If two types of data are independent, it's better for them to go onto separate depots so that topologies interested in only one of the types can consume a smaller stream.

Another important factor to consider with respect to partitioning a depot is balance, as you want to minimize skew between partitions. This can lead to some tasks being overloaded while other ones sit idle, which is a waste of resources.
 
b) Could we write the final stream into a new dedicated depot, for other ETLs to source it, or is it required to abstract at the level of Dataflow functions/macros?

It's often correct to publish to a new depot from a topology for other ETLs to consume, whether in the same module or other modules. A common use case for this is when IDs are generated in a topology, so the topology publishes a depot that adds the newly generated ID to those records. We did this in Mastodon with "*statusDepot" and "*statusWithIdDepot". 

Hope that helps.
 

Thank you for your time.

--
You received this message because you are subscribed to the Google Groups "rama-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rama-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/rama-user/2b17b123-898b-49df-a164-1b469e5e2bf8n%40googlegroups.com.

Fjolne Yngling

unread,
Dec 6, 2023, 3:48:09 AM12/6/23
to rama-user
Thank you, that helps!

One thing I couldn't understand from your answer is whether the timeseries ETL using tick depot as a trigger will be able to calculate features retroactively (i.e. for the whole history of orderbook states), not just as they arrive?

Assume I'd like to batch load a year of raw data into the depot (e.g. individual orders), from which orderbook PState is calculated incrementally. I'd like the timeseries PState to be constructed along the orderbook construction, having entries for every D minutes starting from the first raw data event.

Streaming use case is relevant too, it's just that we need both: currently we have 2 subsystems, one for historical calculation and one for real-time. That would be great if Rama could make it into one.

Nathan Marz

unread,
Dec 6, 2023, 10:47:31 AM12/6/23
to rama...@googlegroups.com
That special PState could accumulate incoming data into windows (e.g. PState of entity -> time bucket -> list of data), and then your ETL triggers rollups of those windows once they're detected to be complete. These rollups can be triggered by tick, and they can triggered in the same branch accumulating those windows when it starts writing data to a new window. Depending on how ordered your data comes in, the specific logic you use to trigger a rollup can vary.

Fjolne Yngling

unread,
Dec 13, 2023, 4:47:22 PM12/13/23
to rama-user
Sorry for the late response. Your answers have been of great help while I was prototyping the solution, and Rama has been a pleasure to work with. I applied to private beta for further discussion.
Reply all
Reply to author
Forward
0 new messages