custom input paths for mrjob

1,220 views
Skip to first unread message

Shivkumar Shivaji

unread,
May 3, 2011, 9:36:25 PM5/3/11
to mr...@googlegroups.com
I wanted to fill in the remote s3 inputs for certain jobs, e.g. for example I am adding a --month option which after post processing options will remotely point to say ['s3://log_machine1/2011-05/', 's3://log_machine2/2011-05/', ...]

How do I get mrjob to work this way? I overrode def load_options(self, args): in the derived mrjob class, though the inputs are right, i get an error when running on emr:

The following code in util.py fails:

# resolve globs
paths = glob.glob(path)
if not paths:
raise IOError(2, 'No such file or directory: %r' % path)
elif len(paths) > 1:
for path in paths:
for line in read_input(path, stdin=stdin):
yield line
return
else:
path = paths[0]

I get the 'no such file or directory' due to the glob resolver.

What is the right way to generate input paths for a job?

Shiv

Shivkumar Shivaji

unread,
May 3, 2011, 10:17:14 PM5/3/11
to mr...@googlegroups.com
Solved the issue by overriding def emr_job_runner_kwargs(self) instead. Commenting in case someone else is interested in generating input paths. Certainly makes automating job runs easier.

This is fine (and in fact excellent) for now. However, at some point, I would assume s3 testing can be possible from local jobs or local hadoop runs, in which case, one might need to override def_load_options to support custom input paths.

Regards, Shiv

Dave Marin

unread,
May 4, 2011, 2:28:33 PM5/4/11
to mr...@googlegroups.com
Cool. I'm a little confused by your use case; it seems like you should just be able to pass in the S3 paths from the command line. Feeding log files organized by date into MRJobs is pretty much Yelp's core use case.

Also, it's appropriate to override input_paths in MRJob.job_runner_kwargs(); it doesn't need to be EMR-specific.

Is your patch anywhere publicly visible (e.g. github)? If I could see it, I might be able to better understand what you're doing. :)

-Dave
--

Yelp is looking to hire great engineers! See http://www.yelp.com/careers.

Shivkumar Shivaji

unread,
May 4, 2011, 2:48:22 PM5/4/11
to mr...@googlegroups.com
Let me give an example. Lets say you want to parse over activity logs over all machines (and lets say "all" means 6) and the folder structure containing monthly activity log data is like so:


Lets say you have activity_log_analyzer_mr.py. You need to put in 
to analyze activity logs for April, 2011.

To analyze logs for May, 2011, the paths above needs to be changed again before they are invoked on the command line.

I would prefer this:
./activity_log_analyzer_mr.py --month 2011-04 --type activity_log -r emr
or even
./activity_log_analyzer_mr.py --current-month --type activity_log -r emr

Using the second approach, the input paths sent to EMR is still the same as the first, but it saves one level of scripting and guards against obvious bugs.

This way, we dont have to remember all paths when the command line invocation is made. What I have done is NOT a patch, but a change to our base class (extending MrJob) to support --month --start-date --end-date and --type. The code is not publicly available, but I can share it via email if people are still interested after reading the mundane explanation above :)

--start-date and --end-date take the search further within a month, e.g. to analyze a day or weeks worth of data.

Shiv

Dave Marin

unread,
May 4, 2011, 4:04:31 PM5/4/11
to mr...@googlegroups.com
Ah, okay, that makes a lot of sense. We have something similar, only it's a batch job that launches MRJobs using make_runner(). Yours sounds like a cleaner solution. :)

Sure, would love to see your code if you're willing to email it to me.

-Dave

Shivkumar Shivaji

unread,
May 4, 2011, 4:42:59 PM5/4/11
to mr...@googlegroups.com
Did not realize this approach might be interesting to others. I will clean up the code a little and email it within a day or so. I wrote some stuff quickly for production jobs but should think of more generalizable options. Also, I was not sure how to best unit/integration test the new option changes. Perhaps a test similar to tests/runner_test.py

Shiv

Dave Marin

unread,
May 4, 2011, 5:02:16 PM5/4/11
to mr...@googlegroups.com
Cool.

One thing you might have already thought of is allowing people to
specify the path to their log file as a string to pass to strftime()
(something like 's3://machine1/activitylogs/%Y-%m-%d/*').

Also, an issue we've run into is if we want to pass a year of logs to
a job, it creates a command line that's too long for EMR. One solution
is to use * for the day when we want all logs for a month; the
calendar module might be helpful for determining when we can do this.
(Potentially we could use * for the month as well when we want all
days in a year, though in the case of most of our logs, that would be
a pricey, pricey job.)

-Dave

Shivkumar Shivaji

unread,
May 4, 2011, 5:12:25 PM5/4/11
to mr...@googlegroups.com
Unfortunately, my quick code at this stage does very minimal regex parsing. I can email you a tidied version of the code within a day but it does not have much smartness. Your solution seems ideal.

Perhaps, we might only need --start-date and --end-date as the options then and leave the rest to regex magic which seems like a good solution.

I might be able to come up with a better attempt after Tuesday.

Regarding your 2nd comment, it makes sense to use the calendar module with practical optimizations to ensure the list of files are not too large.

Shiv

Vladimir Montealegre

unread,
Jan 4, 2013, 2:00:13 PM1/4/13
to mr...@googlegroups.com
Have you guys found a definite solution for this? I could really use it now.
I need to process data provided by Localytics, which comes in tiny files in keys arranged in s3 with the pattern:

s3:/bucketname/year/month/day/somecryptickey7849012ur/name_of_the_key.log

When I feed my job's input as 
's3:/bucketname/year/month/day/somecryptickey7849012ur/' 
there is no problem the Job runs fine and processes all keys under the prefix. However, if I try use something to process only one day of data like:
's3:/bucketname/year/month/day/' 
There are problems, namely, it will complain saying that "year/month/day/somecryptickey7849012ur/" is not a file.

I could try to preprocess a list of inputs with all the paths that lead to files, but there are way too many of them which makes AWS complain (there is a ~200 input limit, correct?), and doing so is very impractical.

Do you have any advise for this?

Thanks a lot!

Shivkumar Shivaji

unread,
Jan 4, 2013, 2:16:37 PM1/4/13
to mr...@googlegroups.com
I have a a solution for the 10,024 character limit and also details and advice on how we do it. Will reply later today.

In short, we do expand paths and store a list of inputs. If it exceeds 10K characters, then we use a custom jar to handle paths. More details to follow.

Shiv

Jim Blomo

unread,
Jan 4, 2013, 6:15:05 PM1/4/13
to mr...@googlegroups.com
This is also how we handle long paths: a custom InputFormat that
expands the input to paths. It is included in
https://github.com/Yelp/oddjob (for a limited time, pre-built jar
available on https://github.com/Yelp/oddjob/downloads). I based it on
Shiv's ideas. Typically we use it with a wrapper which constructs the
manifest file, then starts the job with the manifest file as input.

Jim

Vladimir Montealegre

unread,
Jan 4, 2013, 8:19:51 PM1/4/13
to mr...@googlegroups.com
Thanks Jim and Shiv,
My knowledge on jar files is quite limited, do you have a pointer which can help me getting started with oddjob?
Thanks!
Vlad

Shivkumar Shivaji

unread,
Jan 4, 2013, 11:12:19 PM1/4/13
to mr...@googlegroups.com
On the first question,

I have a setup like "--source activity_log:2012-12-01_2012-12-31 --source trait_log:2012-12-15_2012-12-31" which auto-expands into all activity logs for December 2012 and trait logs for the last 2 weeks of December, 2012. I have extended the base class of mrjob to do a regex grep for the date range one desires for the appropriate types of logs on s3. Note that you can use globs to specify paths but you cant use recursive globs. For example, log-2012-12-11-01.gz can be referenced by log-2012-12-11*

I had sent this code to Dave Marin a while back. The stuff is not easy to generalize for everyone's use case. He created an issue out of it - https://github.com/Yelp/mrjob/issues/125 (see the discussion for more examples).

In short, you have to come up with a wrapper or some solution to make it easier across many job invocations.

On the custom input format when you have more than 10,024 characters:

I am glad that Jim replied and pointed out his jar location, and maintains documentation on oddjob! I used to calculate the length of all input paths and if less than 10,024 chars, I would use the normal api and if not use the custom input format. However, off late, I realized its easier to just use custom input formats for most/all jobs. It only adds about 5-10 mins to a job (maybe not even that much anymore).

The custom input format basically expects the following as content within a text file:

"
..
"

It will then pass all those locations to a mapper. Your job input (instead of the s3 locations) to mrjob should just be a text file containing all inputs locations on s3.

You can see the documentation for oddjob at https://github.com/Yelp/oddjob. The format you want is oddjob.ManifestTextInputFormat (referenced on the page). You can reference his published jar directly for your job.

Shiv

Vladimir Montealegre

unread,
Jan 7, 2013, 10:41:05 AM1/7/13
to mr...@googlegroups.com
Thank you so much Shiv, your answer clarified a lot how to use oddjob.
I have one further question, indirectly related to the question I asked above. The size of the files in the bucket is tiny, and unfortunately it is provided by a third party and I can not modify it, (file size maybe in the order of ~2MB), will the process of feeding the input from the text file with odd job do something like an aggregator so the data can be streamed to the mappers in blocks?

Shivkumar Shivaji

unread,
Jan 7, 2013, 12:25:25 PM1/7/13
to mr...@googlegroups.com
To clarify, I actually don't use odd job but use an equivalent which I wrote a while back. Oddjob is Jim Blomo's utility that supports other functions as well.

You might be running into the famous small files problem with hadoop. If possible, LZO support by the same third party data provider would be great. If you cannot use LZO, you might have to combine many small files into larger ones (to match your block size - likely 128MB (maybe 256Mb)). Oddjob is not meant for this purpose. The ManifestTextInputFormat is meant for specifically to map the files in a way that Amazon's EMR input file size lengths will be bypassed. Oddjob will simply pass each file as a separate input split (which will not solve your problem). Will look/ask around if there is a best practice solution to recommend for your problem.

Shiv

Jim Blomo

unread,
Jan 7, 2013, 4:50:49 PM1/7/13
to mr...@googlegroups.com
On Fri, Jan 4, 2013 at 5:19 PM, Vladimir Montealegre
<luca...@gmail.com> wrote:
> My knowledge on jar files is quite limited, do you have a pointer which can
> help me getting started with oddjob?

Luckily, you don't have to do too much with the jar:

1. download the jar and make it accessible to the mrjob command
2. copy the jar to the master node
3. include it in the -lib-jars option
4. specify the class you want to use

#1 download the jar from https://github.com/Yelp/oddjob/downloads into
the directory with your mrjob

#2 can be handled by mrjob by specifying some extra bootstrap
commands: ` --bootstrap-file oddjob-*-standalone.jar --bootstrap-cmd
'cp oddjob-*-standalone.jar /home/hadoop/'` . This copies the file
onto the master node, then copies it to the hadoop home directory.

#3 again, use mrjob to specify extra hadoop arguments: `--hadoop-arg
-libjars --hadoop-arg oddjob-*-standalone.jar`. This tells hadoop to
include the oddjob jar in the job

#4 finally, tell mrjob what inputformat you'd like to use (in your
case, the oddjob.ManifestTextInputFormat class):
`--hadoop-input-format oddjob.ManifestTextInputFormat`

Your input file should be a newline separated *paths*, not data.
oddjob.ManifestTextInputFormat will expand these paths into files to
be used as actual input.

Let us know how it goes.

Steve Johnson

unread,
Jan 7, 2013, 5:18:24 PM1/7/13
to mr...@googlegroups.com
I think there was a Github issue at some point to make it easy to replace the Hadoop Streaming jar. Might be easier with the new internal step format. I don't know what the state of that issue is now.

This has been another completely unhelpful post from Steve.

Vladimir Montealegre

unread,
Jan 7, 2013, 5:33:02 PM1/7/13
to mr...@googlegroups.com

Thanks again guys you are awesome. 

@Shiv: Yes, the small file problem is indeed the case I have now, I'm afraid that accumulating the files into blocks before starting the job would defeat the purpose of using EMR and force me to sync files in HDFS with a local cluster.
@Jim: Thanks a lot for the very clear explanation!
@Steve: Knowing about the note you wrote can be actually very helpful, none of your posts are unhelpful.

This will take me a bit to try due to other tasks I have on my plate right now, but I will definitely give you feedback about it.

Shivkumar Shivaji

unread,
Jan 8, 2013, 3:14:40 AM1/8/13
to mr...@googlegroups.com
Asked around and found a solution to your problem.
If you have many small files that are forced on you, you would have to use a custom input format jar to combine them into one block size. You can use the CombineFileInputFormat (http://hadoop.apache.org/docs/r0.20.2/api/org/apache/hadoop/mapred/lib/CombineFileInputFormat.html) which can aggregate many small files into larger ones. This also works on non splittable gz files.

You would need to:
1. Use the CombineFileInputFormat. You might be able to use this input format without specifying a jar. I don't know if EMR by default has CombineFileInputFormat included in its class path. 
2. Not sure if Amazon's EMR is affected by issue https://issues.apache.org/jira/browse/MAPREDUCE-1806 (if so, you need to use a AMI with a newer fixed version of hadoop). It is possible that this issue won't happen at all on EMR due to different s3 handling.

Shiv

Vladimir Montealegre

unread,
Jan 8, 2013, 4:51:07 PM1/8/13
to mr...@googlegroups.com
Thank you so much Shiv,

Yes, CombineFileInputFormat seems to be the solution to this, it is mentioned also in the Hadoop book. I have a question on how to use it with MrJob.

CombineFileInputFormat is described as an abstract class which needs implementation, an example for the small file problem is described in:
So this gives me an idea of what to do, but when it comes to MrJob, I am wondering if I should save it as a MyCombineFileInputFormat.jar file and pass it as an argument to MRJobRunner 
e.g.:
with the_job.make_runner(hadoop_input_format = MyCombineFileInputFormat) as runner:
     .... do something

or should I set up something in .mrjob.conf with this parameter?

Thanks,
Vladimir

Shivkumar Shivaji

unread,
Jan 8, 2013, 5:25:44 PM1/8/13
to mr...@googlegroups.com
On the implementation detail, you can do it in several ways. The most common option is just to pass in "-inputformat" to the job as a parameter.

I have the below code for the manifest input format when the input size limit is greater than 10K as it needs to run on ALL jobs where the input size is large. Because it needed to run on all jobs, I put it as part of the emr_job_runner_kwargs. I am providing an example to give you an idea.

The jar itself should be passed in as a different option. The input format is the class that is being used. I pass the jar as --hadoop-streaming-jar=../java/ManifestStreaming/dist/cs_hadoop.jar. Hope that helps.

Shiv

def emr_job_runner_kwargs(self):
       
        if self.options.manifest:
            self.options.hadoop_input_format='com.crowdscience.hadoop.ManifestInputFormat'
            if not self.options.hadoop_streaming_jar:
                raise ValueError("The --manifest option requires a hadoop-streaming-jar value.")

        ...
...
        return super(BaseCSMrJob, self).emr_job_runner_kwargs()

Vladimir Montealegre

unread,
Jan 10, 2013, 6:44:36 PM1/10/13
to mr...@googlegroups.com
Thanks a lot Shiv,
The note on emr_job_runner_kwargs was extremely helpful. I have had a steep learning curve in jar files and javaland in general for this. I could finally figure out how to append the custom input format class to the hadoop-streaming jar and set the corresponding AMI and Hadoop versions.

Now, the job flow starts, configures the instances, starts the first step, and crashes. When I look at the stderr log, I get an error which suggests my custom input format class was compiled with a different version of JDK. Do you know what version of JDK EC2 instances use? I am trying to figure out what to do for this purpose, but haven't been able to figure out a solution, I know this is outside of the scope of MrJob but if you have experience dealing with this problem it could be very helpful.

The error I am getting is:

Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/hadoop/streaming/MyCombineFileInputFormat : Unsupported major.minor version 51.0
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
        at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:247)
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:820)
        at org.apache.hadoop.streaming.StreamUtil.goodClassOrNull(StreamUtil.java:56)
        at org.apache.hadoop.streaming.StreamJob.setJobConf(StreamJob.java:678)
        at org.apache.hadoop.streaming.StreamJob.run(StreamJob.java:117)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
        at org.apache.hadoop.streaming.HadoopStreaming.main(HadoopStreaming.java:32)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

Shivkumar Shivaji

unread,
Jan 10, 2013, 6:54:04 PM1/10/13
to mr...@googlegroups.com
Quick comments.

My jar was compiled with JDK6 and worked fine. My EMR ami uses java 6. The exception trace shows that you used java 7.

It is easy to check what version the ami on EMR uses. Create a persistent job flow, ssh to the machine and type in the standard "java --version". Under http://packages.python.org/mrjob/runners-emr.html, look up the "Reusing Job Flows" section. Create a 1 instance job and do some version checking and test on a small input file with debugging turned on until you get desirable output from your job. I had to do this several times to get my custom input format to work.

Shiv
Reply all
Reply to author
Forward
0 new messages