Re: Unexpected number of input records on large dataset

276 views
Skip to first unread message

Andrew Emil

unread,
Oct 2, 2012, 1:59:01 PM10/2/12
to mongod...@googlegroups.com
Hi Eric,

Could you please upload the actual map/reduce command that you passed to the database to start this whole process?  It is possible that something in the command could be causing this issue.  It would also be helpful to see the schema of the documents that are going into your map reduce.  

I don't know what could be causing this problem right now, hopefully with some more information we can figure it out.

Thanks,
Andrew

On Monday, October 1, 2012 3:59:58 PM UTC-7, Eric Lange wrote:
Hello,

I am new to this forum and have been looking around for some information, but can't seem to find anything relevant.  I am running a MR job on Amazon EMR using the latest Mongo-Hadoop connector and Hadoop 0.21something.

My mongo db is unsharded, and the input collection has 9.4 million records.  When I examine the output, it seems like all the splits are being created properly and all records should be accounted for.  However, the MapReduce output says that there were only ~295k input records.  I should be expecting 9.4m, yes?

When I look at my db:

> db.crawled.count()
9483528

But the MR output says:

map 100% reduce 100%
2012-10-01 00:30:40,167 INFO org.apache.hadoop.mapred.JobClient (main): Job complete: job_201209302024_0001
2012-10-01 00:30:40,204 INFO org.apache.hadoop.mapred.JobClient (main): Counters: 30
2012-10-01 00:30:40,204 INFO org.apache.hadoop.mapred.JobClient (main):   Job Counters 
2012-10-01 00:30:40,204 INFO org.apache.hadoop.mapred.JobClient (main):     Launched reduce tasks=459
2012-10-01 00:30:40,204 INFO org.apache.hadoop.mapred.JobClient (main):     SLOTS_MILLIS_MAPS=6186073223
2012-10-01 00:30:40,205 INFO org.apache.hadoop.mapred.JobClient (main):     Total time spent by all reduces waiting after reserving slots (ms)=0
2012-10-01 00:30:40,205 INFO org.apache.hadoop.mapred.JobClient (main):     Total time spent by all maps waiting after reserving slots (ms)=0
2012-10-01 00:30:40,205 INFO org.apache.hadoop.mapred.JobClient (main):     Rack-local map tasks=2992
2012-10-01 00:30:40,205 INFO org.apache.hadoop.mapred.JobClient (main):     Launched map tasks=2992
2012-10-01 00:30:40,205 INFO org.apache.hadoop.mapred.JobClient (main):     SLOTS_MILLIS_REDUCES=2459973706
2012-10-01 00:30:40,206 INFO org.apache.hadoop.mapred.JobClient (main):     Failed map tasks=1
2012-10-01 00:30:40,206 INFO org.apache.hadoop.mapred.JobClient (main):   File Input Format Counters 
2012-10-01 00:30:40,206 INFO org.apache.hadoop.mapred.JobClient (main):     Bytes Read=0
2012-10-01 00:30:40,206 INFO org.apache.hadoop.mapred.JobClient (main):   File Output Format Counters 
2012-10-01 00:30:40,206 INFO org.apache.hadoop.mapred.JobClient (main):     Bytes Written=0
2012-10-01 00:30:40,206 INFO org.apache.hadoop.mapred.JobClient (main):   FileSystemCounters
2012-10-01 00:30:40,206 INFO org.apache.hadoop.mapred.JobClient (main):     FILE_BYTES_READ=783482
2012-10-01 00:30:40,207 INFO org.apache.hadoop.mapred.JobClient (main):     HDFS_BYTES_READ=720739
2012-10-01 00:30:40,207 INFO org.apache.hadoop.mapred.JobClient (main):     FILE_BYTES_WRITTEN=105144496
2012-10-01 00:30:40,207 INFO org.apache.hadoop.mapred.JobClient (main):   Map-Reduce Framework
2012-10-01 00:30:40,207 INFO org.apache.hadoop.mapred.JobClient (main):     Map output materialized bytes=15042281
2012-10-01 00:30:40,207 INFO org.apache.hadoop.mapred.JobClient (main):     Map input records=294990
2012-10-01 00:30:40,207 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce shuffle bytes=14983089
2012-10-01 00:30:40,208 INFO org.apache.hadoop.mapred.JobClient (main):     Spilled Records=13133
2012-10-01 00:30:40,208 INFO org.apache.hadoop.mapred.JobClient (main):     Map output bytes=1153768
2012-10-01 00:30:40,208 INFO org.apache.hadoop.mapred.JobClient (main):     Total committed heap usage (bytes)=1089867415552
2012-10-01 00:30:40,208 INFO org.apache.hadoop.mapred.JobClient (main):     CPU time spent (ms)=72729860
2012-10-01 00:30:40,208 INFO org.apache.hadoop.mapred.JobClient (main):     Map input bytes=0
2012-10-01 00:30:40,208 INFO org.apache.hadoop.mapred.JobClient (main):     SPLIT_RAW_BYTES=720739
2012-10-01 00:30:40,208 INFO org.apache.hadoop.mapred.JobClient (main):     Combine input records=11954
2012-10-01 00:30:40,209 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce input records=6381
2012-10-01 00:30:40,209 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce input groups=5806
2012-10-01 00:30:40,209 INFO org.apache.hadoop.mapred.JobClient (main):     Combine output records=10582
2012-10-01 00:30:40,209 INFO org.apache.hadoop.mapred.JobClient (main):     Physical memory (bytes) snapshot=1345551327232
2012-10-01 00:30:40,209 INFO org.apache.hadoop.mapred.JobClient (main):     Reduce output records=5806
2012-10-01 00:30:40,209 INFO org.apache.hadoop.mapred.JobClient (main):     Virtual memory (bytes) snapshot=2689983688704
2012-10-01 00:30:40,209 INFO org.apache.hadoop.mapred.JobClient (main):     Map output records=7753


See bolded line above.  There are no other errors aside from the occasional failed task.  I am also definitely getting orders of magnitude fewer reduce output records than I expect.

I am using all the standard configuration options, and get a reasonable split output:

2012-09-30 20:25:40,669 INFO com.flicket.hadoop.Matcher (main): Setting input URI: mongodb://<REDACTED>.compute-1.amazonaws.com/flicket.crawled
2012-09-30 20:25:40,733 INFO com.flicket.hadoop.Matcher (main): Setting output URI: mongodb://<REDACTED>.compute-1.amazonaws.com/flicket.matched_20120930
2012-09-30 20:25:41,719 INFO org.apache.hadoop.mapred.JobClient (main): Default number of map tasks: null
2012-09-30 20:25:41,719 INFO org.apache.hadoop.mapred.JobClient (main): Setting default number of map tasks based on cluster size to : 6
2012-09-30 20:25:41,719 INFO org.apache.hadoop.mapred.JobClient (main): Default number of reduce tasks: 351
2012-09-30 20:25:41,919 INFO org.apache.hadoop.mapred.JobClient (main): Setting group to hadoop
2012-09-30 20:25:42,385 INFO com.mongodb.hadoop.util.MongoSplitter (main):  Calculate Splits Code ... Use Shards? false, Use Chunks? true; Collection Sharded? false
2012-09-30 20:25:42,385 INFO com.mongodb.hadoop.util.MongoSplitter (main): Creation of Input Splits is enabled.
2012-09-30 20:25:42,385 INFO com.mongodb.hadoop.util.MongoSplitter (main): Using Unsharded Split mode (Calculating multiple splits though)
2012-09-30 20:25:42,409 INFO com.mongodb.hadoop.util.MongoSplitter (main): Calculating unsharded input splits on namespace 'flicket.crawled' with Split Key '{ "_id" : 1}' and a split size of '8'mb per
2012-09-30 20:25:47,471 INFO com.mongodb.hadoop.util.MongoSplitter (main): Calculated 2478 splits.

It then goes on to list all of the splits which are perfectly acceptable.  I also looked in the individual syslogs for each task, and I can see all of the contents of my input collection (though kind of unreadable -- it seems they are partially binary?).

I am confused and can't figure out what's going on.  Any hints on what else I should look at?

Thanks,
Eric

Eric Lange

unread,
Oct 2, 2012, 5:14:52 PM10/2/12
to mongod...@googlegroups.com
Hi Andrew,

Thanks for the response.  I am not sure I totally understand what you mean by the "map/reduce command that you passed to the database".  I am passing no query.  I want to process every record in the collection.  I am not sure if this is relevant or not, but here is the elastic-mapreduce command I run to kick the process off.

/opt/aws/emr/elastic-mapreduce --create --plain-output --name Job_Matcher__20120929_234142 --ami-version=2.1.1 --hadoop-version=0.20.205 --jar s3n://flicket/emr/jars/flicket-hadoop-1.0.1.jar --step-name Run_Matcher --log-uri s3n://flicket/emr/logs --main-class com.flicket.hadoop.Matcher --access-id ******** --private-key ******** --arg -Dmapreduce.job.split.metainfo.maxsize=-1 --arg -Dmapred.max.map.failures.percent=50 --arg InputPath=mongodb://ec2-107-22-127-215.compute-1.amazonaws.com/flicket.crawled --arg OutputURI=mongodb://ec2-********.compute-1.amazonaws.com/flicket.matched_20120929 --arg InputURI=mongodb://********.compute-1.amazonaws.com/flicket.crawled --instance-group master --instance-type m1.large --instance-count 1 --availability-zone us-east-1b --instance-group core --instance-type m1.large --instance-count 1 --instance-group task --instance-type c1.xlarge --instance-count 100 --bid-price 0.08

Here are a few random lines from the syslog showing how the split is done, which seems completely rational (the ID is the URL of the content):

2012-09-30 20:25:49,003 INFO com.mongodb.hadoop.mapred.input.MongoInputSplit (main): Creating a new MongoInputSplit for MongoURI 'mongodb://REDACTED.compute-1.amazonaws.com/flicket.crawled', query: '{ "$query" : { } , "$min" : { "_id" : "http://www.220.ro/desene-animate/Foamy-Si-Uraganul/UCLSo9zokS/"} , "$max" : { "_id" : "http://www.220.ro/documentare/Pleo-Robot-Sau-Animal-De-Casa/1nhjDTywjZ/"}}', fieldSpec: '{ }', sort: '{ }', limit: 0, skip: 0 .
2012-09-30 20:25:49,003 INFO com.mongodb.hadoop.mapred.input.MongoInputSplit (main): Creating a new MongoInputSplit for MongoURI 'mongodb://REDACTED.compute-1.amazonaws.com/flicket.crawled', query: '{ "$query" : { } , "$min" : { "_id" : "http://www.220.ro/documentare/Pleo-Robot-Sau-Animal-De-Casa/1nhjDTywjZ/"} , "$max" : { "_id" : "http://www.220.ro/faze-tari/Au-Ajuns-Faimosi-Pe-Internet-Blocati-In-Aeroport/mosMTHYCAu/"}}', fieldSpec: '{ }', sort: '{ }', limit: 0, skip: 0 .
2012-09-30 20:25:49,003 INFO com.mongodb.hadoop.mapred.input.MongoInputSplit (main): Creating a new MongoInputSplit for MongoURI 'mongodb://REDACTED.compute-1.amazonaws.com/flicket.crawled', query: '{ "$query" : { } , "$min" : { "_id" : "http://www.220.ro/faze-tari/Au-Ajuns-Faimosi-Pe-Internet-Blocati-In-Aeroport/mosMTHYCAu/"} , "$max" : { "_id" : "http://www.220.ro/videoclipuri/Florinl-Si-Ioana-Iubirea-Nu-Are-Lege/pNNJJlR543/"}}', fieldSpec: '{ }', sort: '{ }', limit: 0, skip: 0 .
2012-09-30 20:25:49,003 INFO com.mongodb.hadoop.mapred.input.MongoInputSplit (main): Creating a new MongoInputSplit for MongoURI 'mongodb://REDACTED.compute-1.amazonaws.com/flicket.crawled', query: '{ "$query" : { } , "$min" : { "_id" : "http://www.220.ro/videoclipuri/Florinl-Si-Ioana-Iubirea-Nu-Are-Lege/pNNJJlR543/"} , "$max" : { "_id" : "http://www.69stream.com/EN/scheda/7526/aiutami-figlio-mio.html"}}', fieldSpec: '{ }', sort: '{ }', limit: 0, skip: 0 .


As for the schema, these are video records from a crawl.  For instance, when this page is crawled, I generate the following document:

{"content_url":"http://www.amazon.com/Batman-Dark-Knight-Returns-Part/dp/B009GEAPYW","archive_date":1349211077783,"title":"Batman: The Dark Knight Returns Part 1","released":"2012","duration":"77","genre":["animation","action"],"actor":["peter weller","michael emerson"],"director":["jay oliva"],"keyword":["peter weller","michael emerson","david selby","michael mckean","ariel winter","wade williams","jay oliva","alan burnett","bob goodman","batman: the dark knight returns part 1"],"description":"It is ten years after an aging Batman has retired, and Gotham City has sunk deeper into decadence and lawlessness. Now, when his city needs him most, he returns in a blaze of glory."}

and store it in the collection "flicket.crawled".  This is, incidentally, another hadoop process that parses crawl archive text files on S3 and writes the output to MongoDB when it detects a video on the page.  To my knowledge, this worked without issue, generating the 9.4m records without a problem.  I have another collection called "flicket.master" with the exact same schema that is used as the canonical master DB of movies/shows (think IMDb).  My hadoop process does a search of the above record against the set of master records to try and find a match and then assigns a likelihood score.  The idea being that when you search for "Batman the Dark Knight Returns", you will find the most likely places to watch it online.

I know that records are getting dropped somehow, because I can see individual results in the crawl that when I manually match them against the master collection using the same matching code that the mapper uses, I can see strong match results.  But they are missing from the hadoop run.

My mapper takes the input document and performs a search against the master collection, rendering the master record with the highest "score" (my own secret sauce).  If the score is over a certain threshold, I then send it out to the combiner/reducer as a BSONWritable with the schema { _id: <canonical db id>, match: [ {content_url: <url of content>, score: <matching score>} ]} -- note it will always be an array with just one element.

The reducer then takes that output and combines all the matches for a canonical id into a single record: { _id: <canonical db id>, match: [ {content_url: <url of content #1>, score: <matching score #1>}, {content_url: <url of content #2>, score: <marching score #2>}, ... ]}

I hope this helps.  Please let me know if there is any additional information I can provide.

Thanks!
-Eric

Eric Lange

unread,
Oct 4, 2012, 11:07:16 AM10/4/12
to mongod...@googlegroups.com
Quick update:

Perhaps this is more of an AWS/Hadoop issue, but I am now seeing that all of my tasks are uncerimoniously ending after approximately 3 hours whether they have completed their slice or not.  In fact, the entire job takes the same amount of time (right around 4 hours) no matter how many instances I throw at it.  When doing a google search for anything relating to 3 hours, nothing comes up.  Also, regardless of how many instances I give it, mongo-hadoop slices my collection into 8mb chunks.  Is there a reason for this?  Do I need to manually cut it into smaller slices if I want to try to make the tasks shorter?

Thanks,
Eric

Eric Lange

unread,
Oct 5, 2012, 12:59:25 AM10/5/12
to mongod...@googlegroups.com
Another update:  I changed the split size from the default 8mb to 2mb (setting it to 1mb caused a Java Heap Space exception during startup).  By doing this, the runtime went from 4 hours to 12 and the number of records processed from 295k to 1.07m, still well short of the 9.4m total records.  But one thing is clear: there is some sort of time limit for running a task in the 3-4 hour range.  Is there some cursor timeout that could be happening?  Clearly, I need to optimize the mapper, but still this should work.  Again, any help is appreciated.

Maziyar S.P.

unread,
Dec 9, 2013, 7:53:17 AM12/9/13
to mongod...@googlegroups.com
Hi Eric,

I'm in the same boat as you are or used to be. Have you found anything to solve this problem? I have 7 to 9GB of BSON file that I'm trying to just count them on EMR cluster with 15 instances and instead of 13million it gives me 200K or so. What are the details of your architecture? AMI version and hadoop version? Mine is AMI 2.4.2 Hadoop 1.0.3

Thanks

Eric Lange

unread,
Dec 9, 2013, 11:04:03 AM12/9/13
to mongod...@googlegroups.com
Hi Maziyar,

Unfortunately, since this was over a year ago, I have forgotten most of the details.

I was using ami-07339a6e, the Common Crawl Quick-start AMI, with Hadoop 0.20.205.1 (functionally pretty close to 1.0.3).

IIRC, the problem was that the jobs had a watchdog timer that killed them after three hours.  I did not find a way to change this.  Instead, I optimized my mapper function in two ways: (1) instead of running 9.4m records to compare against 2m, I ran the 2m to compare against the 9.4m; and (2) I sniffed out unnecessarily slow parts of the mapper.

So in the end, it was just mapper optimization that solved the problem for me.

Hope this helps.

Cheers,
Eric


--
--
You received this message because you are subscribed to the Google
Groups "mongodb-user" group.
To post to this group, send email to mongod...@googlegroups.com
To unsubscribe from this group, send email to
mongodb-user...@googlegroups.com
See also the IRC channel -- freenode.net#mongodb
 
---
You received this message because you are subscribed to a topic in the Google Groups "mongodb-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/mongodb-user/i0hwErABUOc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to mongodb-user...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Reply all
Reply to author
Forward
0 new messages