Data Retrieval Across Multiple Partitions

39 views
Skip to first unread message

jacob franco

unread,
Sep 16, 2024, 10:59:16 AM9/16/24
to rama-user
I have a system where we ingest "Series" objects from an external API, and for each Series, we fetch the corresponding Matches that belong to that Series. Then, after the fetching and storing in Rama is completed, we make a request to get the matches for a corresponding series ID.

From what I can tell, the fetching is done correctly.

System.out.println("Storing match: id=" + match.getId() + ", seriesId=" + match.getSeriesId() +", order=" + match.getOrder());
matchDepot.appendAsync(match).join();

and we get this output in the console:
Received 1 matches for series 889018
Storing match: id=1326309, seriesId=889018, order=1

So I'm assuming that up to this point is correct.

So moving into my module, we have the following:

stream.pstate("$$matchIdToMatch", PState.mapSchema(Integer.class, Match.class));
stream.pstate("$$seriesIdToMatches",
PState.mapSchema(Integer.class, Match.class));

Where we define two PStates, one mapping a match ID to a match object, and the other mapping a series ID to match objects. Note that the seriesId is a property of the Match object.

We then process the matches like so:

stream.source("*matchDepot").out("*match")
.macro(Helpers.extractFields("*match", "*id", "*seriesId"))
.each(Ops.PRINTLN, "Ingesting Match:", "*id", "SeriesId:", "*seriesId")
.localTransform("$$matchIdToMatch",
Path.key("*id").termVal("*match"))
.each(Ops.PRINTLN, "Match added to $$matchIdToMatch:", "*id", "->", "*match")
.localTransform("$$seriesIdToMatches",
Path.key("*seriesId").termVal("*match"))
.each(Ops.PRINTLN, "Match added to $$seriesIdToMatches:", "*seriesId", "*match");

Which gives us the output in our console:

Ingesting Match: 1326309 SeriesId: 889018 (looking good)
Match added to $$matchIdToMatch: 1326309 -> [A valid looking match object] (looks good)
Match added to $$seriesIdToMatches: 889018 [A valid looking match object] (looks good)

So it looks good up until this point I believe.

When we make a request to get matches for a series ID, we invoke the following query:
topologies.query("getMatchesFromSeriesId", "*seriesId").out("*result")
.each(Ops.PRINTLN, "Getting matches from series id: ", "*seriesId")
.hashPartition("*seriesId")
.each(Ops.PRINTLN, "Series ID to Matches PState: ", "$$seriesIdToMatches")
.localSelect("$$seriesIdToMatches", Path.key("*seriesId"))
.out("*matches")
.each(Ops.PRINTLN, "Matches found for series:", "*seriesId", "Matches:", "*matches")
.ifTrue(new Expr(Ops.IS_NULL, "*matches"),
Block.each(() -> null).out("*result"),
Block.each(Ops.IDENTITY, "*matches").out("*result"))
.each(Ops.PRINTLN, "Retrieved matches for series:", "*seriesId", "Result:", "*result")
.originPartition();

Which is giving us the following output in the console:
Getting matches from series id:  889018 (looks good)

and then we get:
Series ID to Matches PState: [the PState]

And the PState contains a lot of Matches, but not the one with the series ID 889018.  It also tells me that we have 2 partitions.  This is where I think the error lies, but I dont really understand partitions that well or why this would be occurring.

And so as a result we get the following output when the rest of the query executes:

Matches found for series: 889018 Matches: nil
Retrieved matches for series: 889018 Result: nil

Which gives us a blank array when we query the endpoint to get the matches by series ID. 

So I'm thinking of all the possible problems
1) The multiple partitions - very faint idea of how this works honestly so I don't know how to diagnose this issue. 
2) My PState structure is incorrect -- I think I need to make it a list of Matches rather than just a Match but this is probably a separate issue from what we're dealing with, as since this particular series we're testing with is only 1 match, it should still work (probably).
3) There might be an issue with the fetching because we do so recursively.  We fetch the Series objects from the external API in batches, and for each batch, we append each series to the seriesDepot, then we call the function to fetch the matches for each series.  Then, if there's still more series to fetch, we call the same function again.  I'm not sure if this would cause issues, but I believe it is worth mentioning. 
4) There's a timing issue of some kind, but I don't think so since the fetching is already completed before we make the query

Any insight is appreciated!

Nathan Marz

unread,
Sep 16, 2024, 1:13:58 PM9/16/24
to rama...@googlegroups.com
This looks like an issue with how you're updating the PState in your ETL topology. You probably want .hashPartition("*seriesId") before your localTransform of $$seriesIdToMatches.

A critical part of programming Rama is keeping in mind where you're executing at any given point. In an ETL topology, the task you start on is dictated by the depot partitioner. You want your updates to correspond to how you're going to query that PState later on, which is this case by hash partitioning. 

--
You received this message because you are subscribed to the Google Groups "rama-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rama-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/rama-user/3103cfbc-600b-46bb-8b76-468dc4408154n%40googlegroups.com.

jacob franco

unread,
Sep 16, 2024, 4:26:06 PM9/16/24
to rama-user
That was exactly it, it actually makes so much sense now.  Thanks!
Reply all
Reply to author
Forward
0 new messages