What's the best way to process a large query result list in Java

107 views
Skip to first unread message

Scott Murphy

unread,
Mar 2, 2012, 9:00:14 PM3/2/12
to Google App Engine Pipeline API
Let's say for instance, I want to send every user an email that is in
California.

So I do the following query:

select all from User where state = 'CA'


As far as I know I have the following 2 options:

1. Use the Mapper API to iterate through all my users and then just
send the users that are in California an email.

This sucks because if I have only 10 users in California and 40,000
users, I have to do 40,000 reads just to process 10 users.

or

2. Use the Task Queue Api and iterate over a url sequence.

This seems to be a pain because there is no generalized framework to
do this as a Job. I need to properly handle random server 500 errors,
etc and code how to keep track of each job to see if it finished
successfully.


A preferred approach would be if I could feed the Mapper API a query
as input, but I don't think this is possible.
Do I have any other options?

Mark Nuttall-Smith

unread,
Mar 11, 2012, 7:55:07 AM3/11/12
to app-engine-...@googlegroups.com
Hi Scott,

If it's possible you could maintain a set of State entities which have a list of users in that state:

eg.

State {
  @Id
  String code;
  List<String> userKeys;
}

To get the CA users you just get the CA state entity, and do a batch get with all the keys.

Obviously the state entity would need updating transactionally when a new user was added or deleted.

Hope it helps,
Mark

Scott Murphy

unread,
Mar 11, 2012, 4:14:59 PM3/11/12
to app-engine-...@googlegroups.com
Yeah, that would work... but that's a lot of book keeping... The problem is I have numerous 
cases with the same type generalized problem.  I would prefer to iterate over a query and 
was just wondering if there was any generalized way of doing this without having to treat
each situation differently.   

Whether it be a query or the blobstore or a logservice query, I just want a generalized way
to take in paginated input and treat it like a job that could maintain state and resume from 
a particular point upon failure.  

I don't know if there is anything in the Pipeline API that can help me with this, but running
mappers over entire data sets when you have indexes built just seems terribly inefficient 
to me.

Best,

Scott

Mike Aizatsky

unread,
Mar 13, 2012, 6:52:01 PM3/13/12
to app-engine-...@googlegroups.com
Scott,

Yes, it's impossible right now to feed a query into the Mapper. Mostly
because it's hard (maybe impossible) to shard query results. If you
are satisfied with having one shard only, I believe it is possible to
implement that in the code.

You'll have to copy and modify three classes:

Modify the query in createIterator() from
https://code.google.com/p/appengine-mapreduce/source/browse/trunk/java/src/com/google/appengine/tools/mapreduce/DatastoreRecordReader.java

https://code.google.com/p/appengine-mapreduce/source/browse/trunk/java/src/com/google/appengine/tools/mapreduce/DatastoreInputSplit.java

Return one fixed split from:
https://code.google.com/p/appengine-mapreduce/source/browse/trunk/java/src/com/google/appengine/tools/mapreduce/DatastoreInputFormat.java

It should be really straightforward. I can take a look at your code if
you upload it to gist/pastebin.
Let me know if you have figured this out.

--
Regards,
Mike

Ronoaldo José de Lana Pereira

unread,
Mar 13, 2012, 7:11:17 PM3/13/12
to app-engine-...@googlegroups.com
Mike,

By your experience, does it makes sense to use the "__scatter__" property in a special index in a query to try sharding it (even poor sharding)? I just created a test index that includes the reserved __scatter__ property, and seems to return the right results when performing a query, but I got stuck on where I should change to use this query and perform the sharding...

Best Reargds,

-Ronoaldo

Mike Aizatsky

unread,
Mar 13, 2012, 9:07:18 PM3/13/12
to app-engine-...@googlegroups.com
Yes, it does. You should modify line 100 in
https://code.google.com/p/appengine-mapreduce/source/browse/trunk/java/src/com/google/appengine/tools/mapreduce/DatastoreInputFormat.java

The only caveat is that you need a special index which includes both
__scatter__ and other fields needed to run your special query. The
biggest issue here is that I think you are going to need two special
indices for it to run:

- necessary properties + __scatter__ (to generate splits)
- __key__ + necessary properties (to iterate over those)

But I might be wrong here. Never actually tried it.

--
Regards,
Mike

Ronoaldo José de Lana Pereira

unread,
Mar 14, 2012, 8:01:10 AM3/14/12
to app-engine-...@googlegroups.com
Great! Thanks for your tips, I'll give it a try. The needed indexes are probably cheeper than iterate over the whole datastore every time we need to map only portions of it.

Best Regards,

-Ronoaldo

Ronoaldo Pereira

unread,
Mar 24, 2012, 9:27:54 PM3/24/12
to app-engine-...@googlegroups.com
Hi Mike,

I'm amost there! After a few haking I have some code to allow testing the required indexes. I'll test on production and report back when finished.

I uploaded a codereview, do you have some time to look into it http://codereview.appspot.com/5905049/? Just to make sure I'm on the right track... Still need to write a good end-to-end test, but looks like I didn't break anything yet.

Best Regards,

-Ronoaldo

Mike Aizatsky

unread,
Apr 2, 2012, 4:48:37 PM4/2/12
to app-engine-...@googlegroups.com
Yes, I think this would work.

Amit Sangani

unread,
Sep 12, 2013, 1:28:05 PM9/12/13
to app-engine-...@googlegroups.com
Hi Ronoaldo,

We have the exact same requirement - to filter before feeding the Mapper. What's the simplest way to get this accomplished?

thanks,
Amit


On Saturday, March 24, 2012 6:27:54 PM UTC-7, Ronoaldo Pereira wrote:

Ronoaldo Pereira

unread,
Sep 16, 2013, 10:39:38 AM9/16/13
to app-engine-...@googlegroups.com
Hi Amit,

I posted the patch for the old version of Map/Reduce, so it may not work for the new (recomended!) version. Anyway, the main requirement for good sharding is to create the custom index as explained by Mike. After that, you can take a look at this implementation, where the sharded query is built, and try to extend that Job class. You can addapt my patch to override the method that create the split points to add parameters to the query.

Another option, is to use only the Pipelines API to run your processing. You can start iterating over the results or key ranges in a Pipeline, and then pass the results to the child jobs of a Pipeline. The rationale is that, if you will already query a sub-set of your information, then you may not need to use map/reduce itself to iterate over it: you can implement a generator job that splits the data, say, withint 100 child jobs, each one processing 1000 records, giving you a reasonable performance. Obviously, you can tweak the numbers or calculate them based on the expected resultset size.

Let me know if you want to discuss more particular cases sending a message on G+ or posting here in the groups.

Kind regards,
Reply all
Reply to author
Forward
0 new messages