class MRCollect(MRJob):def mapper(self, _, line):# normal mapperdef reducer(self, key, values):# normal reducer
inputFiles = [] # this list of input files has hundreds of thousands of s3 paths# attempt 1 - failed saying a list can't be hashedmr = MRCollect(args=['-r', 'emr', inputFiles])
# attempt 2 - eventually fails with "socket.error: [Errno 32] Broken pipe", inspecting __input_paths shows a one-string list, i assume that's wrong, and should be a list of pathsmr = MRCollect(args=['-r', 'emr', ' '.join(inputFiles)])
with mr.make_runner() as runner:
runner.run()
# attempt 3 - set the input paths manually. this seemed to work, but then fails with "mrjob.job.UsageError: make_runner() was called with --steps. This probably means you tried to use it from __main__, which doesn't work."
mr = MRCollect(args=['-r', 'emr'])
with mr.make_runner() as runner:
runner._input_paths = [inputFiles[0], inputFiles[1]]runner.run()
I should also specify the reason I'm doing this is because I was running my job like this:python collect.py -r emr s3://mybucket/someprefix/someotherprefix/2014/01/27/*/*And it was taking over 2 hours to process the wildcards (there are about 20000 files in there), and then the read from S3 would start, which would take about an hour, and then the map reduce job would start, which would take about an hour.However, if i used boto manually to find all the keys in s3://mybucket/someprefix/someotherprefix/2014/01/27/*/* , it would get the list in 2 minutes.
On Thursday, 6 February 2014 16:43:51 UTC-8, Jordan Hudson wrote:
I'm having trouble understanding the correct way to start a job and specify input files and other arguments programatically rather than command line arguments.
Please help.
My MRJob class is quite simple:class MRCollect(MRJob):def mapper(self, _, line):# normal mapperdef reducer(self, key, values):# normal reducerI've tried these:inputFiles = [] # this list of input files has hundreds of thousands of s3 paths# attempt 1 - failed saying a list can't be hashedmr = MRCollect(args=['-r', 'emr', inputFiles])
# attempt 2 - eventually fails with "socket.error:[Errno32]Brokenpipe", inspecting __input_paths shows a one-string list, i assume that's wrong, and should be a list of paths
mr = MRCollect(args=['-r', 'emr', ' '.join(inputFiles)])
with mr.make_runner() as runner:runner.run()# attempt 3 - set the input paths manually. this seemed to work, but then fails with "mrjob.job.UsageError: make_runner() was called with --steps. This probably means you tried to use it from __main__, which doesn't work."mr = MRCollect(args=['-r', 'emr'])with mr.make_runner() as runner:runner._input_paths = [inputFiles[0], inputFiles[1]]runner.run()
--You received this message because you are subscribed to the Google Groups "mrjob" group.To unsubscribe from this group and stop receiving emails from it, send an email to mrjob+un...@googlegroups.com.For more options, visit https://groups.google.com/groups/opt_out.