I have a system where we ingest "Series" objects from an external API, and for each Series, we fetch the corresponding Matches that belong to that Series. Then, after the fetching and storing in Rama is completed, we make a request to get the matches for a corresponding series ID.
From what I can tell, the fetching is done correctly.
System.out.println("Storing match: id=" + match.getId() + ", seriesId=" + match.getSeriesId() +", order=" + match.getOrder());
matchDepot.appendAsync(match).join();
and we get this output in the console:
Received 1 matches for series 889018
Storing match: id=1326309, seriesId=889018, order=1
So I'm assuming that up to this point is correct.
So moving into my module, we have the following:
stream.pstate("$$matchIdToMatch", PState.mapSchema(Integer.class, Match.class));
stream.pstate("$$seriesIdToMatches",
PState.mapSchema(Integer.class, Match.class));
Where we define two PStates, one mapping a match ID to a match object, and the other mapping a series ID to match objects. Note that the seriesId is a property of the Match object.
We then process the matches like so:
stream.source("*matchDepot").out("*match")
.macro(Helpers.extractFields("*match", "*id", "*seriesId"))
.each(Ops.PRINTLN, "Ingesting Match:", "*id", "SeriesId:", "*seriesId")
.localTransform("$$matchIdToMatch",
Path.key("*id").termVal("*match"))
.each(Ops.PRINTLN, "Match added to $$matchIdToMatch:", "*id", "->", "*match")
.localTransform("$$seriesIdToMatches",
Path.key("*seriesId").termVal("*match"))
.each(Ops.PRINTLN, "Match added to $$seriesIdToMatches:", "*seriesId", "*match");
Which gives us the output in our console:
Ingesting Match: 1326309 SeriesId: 889018 (looking good)
Match added to $$matchIdToMatch: 1326309 -> [A valid looking match object] (looks good)
Match added to $$seriesIdToMatches: 889018 [A valid looking match object] (looks good)
So it looks good up until this point I believe.
When we make a request to get matches for a series ID, we invoke the following query:
topologies.query("getMatchesFromSeriesId", "*seriesId").out("*result")
.each(Ops.PRINTLN, "Getting matches from series id: ", "*seriesId")
.hashPartition("*seriesId")
.each(Ops.PRINTLN, "Series ID to Matches PState: ", "$$seriesIdToMatches")
.localSelect("$$seriesIdToMatches", Path.key("*seriesId"))
.out("*matches")
.each(Ops.PRINTLN, "Matches found for series:", "*seriesId", "Matches:", "*matches")
.ifTrue(new Expr(Ops.IS_NULL, "*matches"),
Block.each(() -> null).out("*result"),
Block.each(Ops.IDENTITY, "*matches").out("*result"))
.each(Ops.PRINTLN, "Retrieved matches for series:", "*seriesId", "Result:", "*result")
.originPartition();
Which is giving us the following output in the console:
Getting matches from series id: 889018 (looks good)
and then we get:
Series ID to Matches PState: [the PState]
And the PState contains a lot of Matches, but not the one with the series ID 889018. It also tells me that we have 2 partitions. This is where I think the error lies, but I dont really understand partitions that well or why this would be occurring.
And so as a result we get the following output when the rest of the query executes:
Matches found for series: 889018 Matches: nil
Retrieved matches for series: 889018 Result: nil
Which gives us a blank array when we query the endpoint to get the matches by series ID.
So I'm thinking of all the possible problems
1) The multiple partitions - very faint idea of how this works honestly so I don't know how to diagnose this issue.
2) My PState structure is incorrect -- I think I need to make it a list of Matches rather than just a Match but this is probably a separate issue from what we're dealing with, as since this particular series we're testing with is only 1 match, it should still work (probably).
3) There might be an issue with the fetching because we do so recursively. We fetch the Series objects from the external API in batches, and for each batch, we append each series to the seriesDepot, then we call the function to fetch the matches for each series. Then, if there's still more series to fetch, we call the same function again. I'm not sure if this would cause issues, but I believe it is worth mentioning.
4) There's a timing issue of some kind, but I don't think so since the fetching is already completed before we make the query
Any insight is appreciated!