Mongo-Hadoop, MongoSplitter is generating far too many tasks because it doesn't take into account the query when splitting

175 views
Skip to first unread message

Richard Tolley

unread,
Apr 25, 2013, 9:28:40 AM4/25/13
to mongod...@googlegroups.com
I've encountered a problem using the Mongo-Hadoop connector when coupled with a query- the splits being generated are derived based on the size of the entire collection rather than the data resultant from the query. In our case we're using a query to generate daily statistics from a very large collection, meaning our data is actually about 1% of what the maxChunkSize parameter says (default 8MB). The only way I can cut down the number of splits to something reasonable and thus make the tasks not extremely small is to set the splitSize to 1000MB. Is there another way to do this which doesn't involve this pretty crude approach?

This is significant problem because of the overhead in creating tasks in Hadoop- when we have thousands of tasks the overhead in task creation is far higher than the actual time spent processing. For example an extremely simple job took 20 minutes with the 8MB split size and less than 2 minutes with 1GB split size on Amazon Elastic Map Reduce with identical resource allocation.

The part of the code in question is in MongoSplitter.java:93, here:

final DBObject cmd = BasicDBObjectBuilder.start("splitVector", ns).
                                      add( "keyPattern", splitKey ).
                                          add( "force", false ). // force:True is misbehaving it seems
                                          add( "maxChunkSize", splitSize ).get();

where I've set splitSize to 1000 instead of the default 8.

I can't find any documentation on the splitVector command so I've no idea if its possible to provide a query or if there's some other way of getting around this...

Thanks,
Richard

Johan Hedin

unread,
Apr 29, 2013, 3:50:13 PM4/29/13
to mongod...@googlegroups.com
Richard, is looks to me that your problem is related to these two bug reports:

https://jira.mongodb.org/browse/SERVER-9498
https://jira.mongodb.org/browse/SERVER-9365

Even though I don't know how the Hadoop connector work, the command you refer to in MongoSplitter probably end up in a mongod instance and in code that handles the splitVector command. And there seem to bee issues with that command.

You might want to comment on your problem in these bugs.

Regards,
Johan


2013/4/25 Richard Tolley <richard...@gmail.com>

--
--
You received this message because you are subscribed to the Google
Groups "mongodb-user" group.
To post to this group, send email to mongod...@googlegroups.com
To unsubscribe from this group, send email to
mongodb-user...@googlegroups.com
See also the IRC channel -- freenode.net#mongodb
 
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Alex at Ikanow

unread,
Apr 30, 2013, 1:24:50 PM4/30/13
to mongod...@googlegroups.com

I wrote about this in a blog post recently


Basically if you are getting significant reductions in data volumes from querying then the default Mongo-Hadoop connector performs extremely sub-optimally.

There is no particularly easy answer (I play very briefly with a few options and report some simplistic benchmarks in the blog post)

For cases where the query reduction gets you down to only a few hundred K docs then I wrote an alternative splitter that uses skip/limit (it's open source and linked from  the blog post), though obviously this is something of a "toy" case.

Having played some more since the blog post was written, I think the best architecture for a Mongo-Hadoop connector is completely different to what 10gen provide.

I would have a separate server (obv this could be built into a mongos instead) co-located with each mongod (aka shard; not worrying about replicas for the moment). The splitter then sends the query to each of these servers, which generates a single cursor per shard. The Hadoop tasks then connect to the server (ie many tasks connecting to a single cursor), which doles out data in blocks.

This would have a bunch of advantages:
 - you are guaranteed to distribute well and not query any more data than necessary
 - you only query once per shard, plus you don't have to created combined indexes with the shard key (this is explained further in the blog post) - one of the big bottlenecks for large jobs is that each query gets executed once per task, which can slam the mongod
 - you can move in (approximately) natural order through the dataset, which massively improves read performance

I have it on my todo list to write this at some point, since I use Hadoop/MongoDB quite a lot and the existing implementation/my current extension does not work for many of my use cases

Alex
Reply all
Reply to author
Forward
0 new messages