How do i start a job programatically with multiple input files?

1,126 views
Skip to first unread message

Jordan Hudson

unread,
Feb 6, 2014, 7:43:51 PM2/6/14
to mr...@googlegroups.com
I'm having trouble understanding the correct way to start a job and specify input files and other arguments programatically rather than command line arguments.

Please help.

My MRJob class is quite simple:

class MRCollect(MRJob):

    def mapper(self, _, line):
        # normal mapper

    def reducer(self, key, values):  
        # normal reducer

I've tried these:

inputFiles = []    # this list of input files has hundreds of thousands of s3 paths

# attempt 1 - failed saying a list can't be hashed
mr = MRCollect(args=['-r', 'emr', inputFiles])
 
 
# attempt 2 - eventually fails with "socket.error: [Errno 32] Broken pipe", inspecting __input_paths shows a one-string list, i assume that's wrong, and should be a list of paths
mr = MRCollect(args=['-r', 'emr', ' '.join(inputFiles)])
 
with mr.make_runner() as runner: 
runner.run()

# attempt 3 - set the input paths manually.  this seemed to work, but then fails with "mrjob.job.UsageError: make_runner() was called with --steps. This probably means you tried to use it from __main__, which doesn't work." 
mr = MRCollect(args=['-r', 'emr'])
 
with mr.make_runner() as runner: 

runner._input_paths = [inputFiles[0], inputFiles[1]]
runner.run()
 
 

Jordan Hudson

unread,
Feb 6, 2014, 7:51:31 PM2/6/14
to mr...@googlegroups.com
I should also specify the reason I'm doing this is because I was running my job like this:

python collect.py -r emr s3://mybucket/someprefix/someotherprefix/2014/01/27/*/*

And it was taking over 2 hours to process the wildcards (there are about 20000 files in there), and then the read from S3 would start, which would take about an hour, and then the map reduce job would start, which would take about an hour.

However, if i used boto manually to find all the keys in s3://mybucket/someprefix/someotherprefix/2014/01/27/*/* , it would get the list in 2 minutes.

Steve Johnson

unread,
Feb 6, 2014, 8:01:48 PM2/6/14
to mr...@googlegroups.com
mr = MRCollect(args=['-r', 'emr'] + inputFiles)
 
That's a list of command line args. A list would not be a valid command line arg.
 
On Thu, Feb 6, 2014, at 04:51 PM, Jordan Hudson wrote:
I should also specify the reason I'm doing this is because I was running my job like this:
 
python collect.py -r emr s3://mybucket/someprefix/someotherprefix/2014/01/27/*/*
 
And it was taking over 2 hours to process the wildcards (there are about 20000 files in there), and then the read from S3 would start, which would take about an hour, and then the map reduce job would start, which would take about an hour.
 
However, if i used boto manually to find all the keys in s3://mybucket/someprefix/someotherprefix/2014/01/27/*/* , it would get the list in 2 minutes.
 
 
 
On Thursday, 6 February 2014 16:43:51 UTC-8, Jordan Hudson wrote:
I'm having trouble understanding the correct way to start a job and specify input files and other arguments programatically rather than command line arguments.
 
Please help.
 
My MRJob class is quite simple:
 
class MRCollect(MRJob):
 
    def mapper(self, _, line):
        # normal mapper
 
    def reducer(self, key, values):  
        # normal reducer
 
I've tried these:
 
inputFiles = []    # this list of input files has hundreds of thousands of s3 paths
 
# attempt 1 - failed saying a list can't be hashed
mr = MRCollect(args=['-r', 'emr', inputFiles])
 
 
# attempt 2 - eventually fails with "socket.error:[Errno32]Brokenpipe", inspecting __input_paths shows a one-string list, i assume that's wrong, and should be a list of paths
mr = MRCollect(args=['-r', 'emr', ' '.join(inputFiles)])
 

with mr.make_runner() as runner: 
runner.run()
 
# attempt 3 - set the input paths manually.  this seemed to work, but then fails with "mrjob.job.UsageError: make_runner() was called with --steps. This probably means you tried to use it from __main__, which doesn't work." 
mr = MRCollect(args=['-r', 'emr'])
 

with mr.make_runner() as runner: 
 
runner._input_paths = [inputFiles[0], inputFiles[1]]
runner.run()
 
 


--
You received this message because you are subscribed to the Google Groups "mrjob" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mrjob+un...@googlegroups.com.
 

Attefeh Falah

unread,
Nov 8, 2017, 10:35:30 AM11/8/17
to mrjob
Dear Steve
I see you know a lot about mrjob. I am really confused in implementing a two matrices multiplication using two layer mapreduce with mrjob. would you please for God sake help me :((( my email address is attefe...@gmail.com
thank you in advanced
Reply all
Reply to author
Forward
0 new messages