Multiple output files?

1,625 views
Skip to first unread message

Jordan Christensen

unread,
Feb 4, 2013, 10:09:55 PM2/4/13
to mr...@googlegroups.com
I've recently learnt the joy of using custom input files (as mentioned in the thread titled 'How to bundle hadoop-streaming.jar for inputformat?') - using CombineFileInputFormat has reduced the impact of running mrjob with lots of small input files generated by our logging system.

I was wondering if anyone has tried to use a custom output format with mrjob, specifically to write the output of a reduce task to different files. The goal is to partition the output into different directories, potentially for use with Hive partitions. 

In Java Hadoop this seems pretty straightforward. Has anyone done it with mrjob successfully?

Thanks!

JC

Jim Blomo

unread,
Feb 5, 2013, 5:18:34 PM2/5/13
to mr...@googlegroups.com
On Mon, Feb 4, 2013 at 7:09 PM, Jordan Christensen <theb...@gmail.com> wrote:
> I was wondering if anyone has tried to use a custom output format with
> mrjob, specifically to write the output of a reduce task to different files.
> The goal is to partition the output into different directories, potentially
> for use with Hive partitions.

Hi Jordan, check out https://github.com/Yelp/oddjob : it is a
collection of custom output format classes designed with mrjob in
mind. It can split reducer output into different directories with
several alternatives for setting the subdirectory. I think it should
work with hive partitions.

> In Java Hadoop this seems pretty straightforward. Has anyone done it with
> mrjob successfully?

We use oddjob on several batches we run daily, it has been working
well for us so far! If you search for 'oddjob' in this list's
history, you should find a few more examples of potential use cases.

Jim

Hunter Blanks

unread,
Feb 5, 2013, 6:15:42 PM2/5/13
to mr...@googlegroups.com
Jordan,

Yes! We used oddjob for just this purpose (populating a hive warehouse in S3) at Monetate.

One thing to note is that you should make sure that your final reduce step only writes to a fairly small number of output prefixes.

So, if one reducer currently output:

    iso8601=2013-01-03/account=13/
    iso8601=2013-01-03/account=14/
    iso8601=2013-01-03/account=15/
    iso8601=2013-01-03/account=16/
    iso8601=2013-01-03/account=17/
    ...
    iso8601=2013-01-03/account=1500/

you would want to make sure to do an additional reduce, sorting by iso8601 and account id, so that a single reducer only wrote to a couple files at once.

-HJB



--
You received this message because you are subscribed to the Google Groups "mrjob" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mrjob+un...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



Michael @ Bitly

unread,
Apr 10, 2013, 4:09:53 PM4/10/13
to mr...@googlegroups.com
Can you guys offer any guidance on an error I'm getting using oddjob with -r hadoop? I've installed the jar file on my hadoop hosts, but I get this error on the last step of a 4-step job, presumably when it tries to load the oddjob jar:

HADOOP: Exception in thread "main" java.lang.ExceptionInInitializerError
HADOOP:         at clojure.core__init.__init0(Unknown Source)
HADOOP:         at clojure.core__init.<clinit>(Unknown Source)
HADOOP:         at java.lang.Class.forName0(Native Method)
HADOOP:         at java.lang.Class.forName(Class.java:266)
HADOOP:         at clojure.lang.RT.loadClassForName(RT.java:2030)
HADOOP:         at clojure.lang.RT.load(RT.java:417)
HADOOP:         at clojure.lang.RT.load(RT.java:398)
HADOOP:         at clojure.lang.RT.doInit(RT.java:434)
HADOOP:         at clojure.lang.RT.<clinit>(RT.java:316)
HADOOP:         at clojure.lang.Namespace.<init>(Namespace.java:34)
HADOOP:         at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
HADOOP:         at clojure.lang.Var.internPrivate(Var.java:149)
HADOOP:         at oddjob.MultipleJSONOutputFormat.<clinit>(Unknown Source)
HADOOP:         at java.lang.Class.forName0(Native Method)
HADOOP:         at java.lang.Class.forName(Class.java:266)
HADOOP:         at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1510)
HADOOP:         at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1475)
HADOOP:         at org.apache.hadoop.streaming.StreamUtil.goodClassOrNull(StreamUtil.java:56)
HADOOP:         at org.apache.hadoop.streaming.StreamJob.setJobConf(StreamJob.java:806)
HADOOP:         at org.apache.hadoop.streaming.StreamJob.run(StreamJob.java:122)
HADOOP:         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
HADOOP:         at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
HADOOP:         at org.apache.hadoop.streaming.HadoopStreaming.main(HadoopStreaming.java:50)
HADOOP:         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
HADOOP:         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
HADOOP:         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
HADOOP:         at java.lang.reflect.Method.invoke(Method.java:616)
HADOOP:         at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
HADOOP: Caused by: java.lang.IllegalStateException: Attempting to call unbound fn: #'clojure.core/refer
HADOOP:         at clojure.lang.Var$Unbound.throwArity(Var.java:43)
HADOOP:         at clojure.lang.AFn.invoke(AFn.java:39)
HADOOP:         at clojure.lang.Var.invoke(Var.java:401)
HADOOP:         at clojure.lang.RT.doInit(RT.java:447)
HADOOP:         at clojure.lang.RT.<clinit>(RT.java:316)
HADOOP:         ... 28 more

This same code works on EMR with the bootstrap commands Jim has documented here and elsewhere (including on the github page).

Thanks,
Michael

Jim Blomo

unread,
Apr 10, 2013, 6:18:44 PM4/10/13
to mr...@googlegroups.com
Hmm, I'm wondering if this is due to the version differences in Hadoop
or Java. Can you compare the versions of EMR and Hadoop?

Michael @ Bitly

unread,
Apr 11, 2013, 2:59:11 AM4/11/13
to mr...@googlegroups.com
Hadoop:
hadoop version: 2.0.0
java version: 1.6.0_24

EMR:
hadoop version: 1.0.3
ami version: 2.3.3
java version: 1.6.0_31 (not completely sure of this one)


We were speculating that Clojure may be installed by default on the AMIs and that perhaps we need to manually install it on our Hadoop cluster...? That was probably going to be tomorrow morning's first test.
 
 

Jim Blomo

unread,
Apr 11, 2013, 1:36:10 PM4/11/13
to mr...@googlegroups.com

Clojure should not be a requirement on the nodes: it should all be wrapped up in the jar. In fact, it is possible another version of Clojure being loaded could cause this error. Are you including any other jars that might have clojure libs in them?

The Hadoop version may be using a customized class loader that is confusing the clojure init sequence. Another thing to test would be compiling oddjob with the newest version of Clojure.

--

Michael @ Bitly

unread,
Apr 15, 2013, 1:31:04 AM4/15/13
to mr...@googlegroups.com
Aside from the config entry for hadoop_streaming_jar: /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.3.jar, oddjob is the only jar I'm loading.

Jim Blomo

unread,
Apr 15, 2013, 9:11:06 PM4/15/13
to mr...@googlegroups.com
OK, I've bumped the dependencies and created a new uberjar. I've sent
it to you off-list. Can you give it a try?

Jim Blomo

unread,
Apr 16, 2013, 2:25:47 PM4/16/13
to Michael Richman, mr...@googlegroups.com
Adding group back in.

Oh *2*.0. For some reason I was just thinking 1.0, which is just a
minor change from the 0.20 series. 2.0 is the whole new YARN setup,
right? Yes, this makes it much more likely that the version change is
the source of the problem.

The error you're seeing seems to be correlated with the way classes
are loaded. 2.0 could very well have changed that process
significantly. What I'd recommend is going through the 2.0 docs and
seeing if/how the InputFormat infrastructure has changed. Either way,
you'll want to update the development dependencies to reflect the new
version. I'll be happy to code review changes, but unfortunately, I
don't have time to do this upgrade myself.

On Tue, Apr 16, 2013 at 10:37 AM, Michael Richman <m...@bitly.com> wrote:
> One of my colleagues highlighted the fact that we're using Hadoop version
> 2.0.0 while your README says "This version of oddjob is designed to be used
> as a -libjar argument with hadoop 0.20."
>
> We talked about Hadoop version in our general versions mail, but I figured
> I'd raise it again since it come up internally. Do we think the issue could
> lie in there somewhere?
>
>
> On Tue, Apr 16, 2013 at 8:47 AM, Michael Richman <m...@bitly.com> wrote:
>>
>> Hey Jim,
>>
>> Thanks for this! Unfortunately, I get the same error. The full run command
>> and output is below, in case it gives you any more ideas.
>>
>> % python test_unify_one_love.py -o hdfs:///user/mrwoof/test-2013-01 -r
>> hadoop
>> hdfs:///user/mrwoof/keyword_clicks.stream06_ec2.2013-02-01_00.10.log.gz
>> --hadoop-arg -libjars --hadoop-arg /home/mrwoof/oddjob-1.0.1-standalone.jar
>> --no-output --jobconf mapred.reduce.tasks=120
>> --input_file=input_multi_sample.json --month=01 --jobconf
>> mapred.output.compress=true --jobconf
>> mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
>> --jobconf mapred.job.name=audience-analysis-test-2013-01
>> using configs in /etc/mrjob.conf
>> creating tmp directory
>> /tmp/test_unify_one_love.mrwoof.20130416.144420.430216
>> Copying non-input files into
>> hdfs:///user/mrwoof/tmp/mrjob/test_unify_one_love.mrwoof.20130416.144420.430216/files/
>> Using Hadoop version 2.0.0
>> HADOOP: Exception in thread "main" java.lang.ExceptionInInitializerError
>> HADOOP: at clojure.core__init.__init0(Unknown Source)
>> HADOOP: at clojure.core__init.<clinit>(Unknown Source)
>> HADOOP: at java.lang.Class.forName0(Native Method)
>> HADOOP: at java.lang.Class.forName(Class.java:266)
>> HADOOP: at clojure.lang.RT.loadClassForName(RT.java:2098)
>> HADOOP: at clojure.lang.RT.load(RT.java:430)
>> HADOOP: at clojure.lang.RT.load(RT.java:411)
>> HADOOP: at clojure.lang.RT.doInit(RT.java:447)
>> HADOOP: at clojure.lang.RT.<clinit>(RT.java:329)
>> HADOOP: at clojure.lang.Namespace.<init>(Namespace.java:34)
>> HADOOP: at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
>> HADOOP: at clojure.lang.Var.internPrivate(Var.java:163)
>> HADOOP: at clojure.lang.Var.invoke(Var.java:415)
>> HADOOP: at clojure.lang.RT.doInit(RT.java:460)
>> HADOOP: at clojure.lang.RT.<clinit>(RT.java:329)
>> HADOOP: ... 28 more
>> Job failed with return code 1: ['/usr/bin/hadoop', 'jar',
>> '/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.3.jar',
>> '-files',
>> 'hdfs:///user/mrwoof/tmp/mrjob/test_unify_one_love.mrwoof.20130416.144420.430216/files/test_unify_one_love.py#test_unify_one_love.py,hdfs:///user/mrwoof/tmp/mrjob/test_unify_one_love.mrwoof.20130416.144420.430216/files/input_multi_sample.json#input_multi_sample.json',
>> '-archives',
>> 'hdfs:///user/mrwoof/tmp/mrjob/test_unify_one_love.mrwoof.20130416.144420.430216/files/mrjob.tar.gz#mrjob.tar.gz',
>> '-libjars', '/home/mrwoof/oddjob-1.0.1-standalone.jar', '-D',
>> 'mapred.job.name=audience-analysis-test-2013-01', '-D',
>> 'mapred.output.compress=true', '-D',
>> 'mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec',
>> '-D', 'mapred.reduce.tasks=120', '-cmdenv', 'PYTHONPATH=mrjob.tar.gz',
>> '-outputformat', 'oddjob.MultipleJSONOutputFormat', '-input',
>> 'hdfs:///user/mrwoof/keyword_clicks.stream06_ec2.2013-02-01_00.10.log.gz',
>> '-output', 'hdfs:///user/mrwoof/test-2013-01', '-mapper',
>> '/bitly/local/bin/python test_unify_one_love.py --step-num=0 --mapper
>> --input_file input_multi_sample.json --month 01', '-combiner',
>> '/bitly/local/bin/python test_unify_one_love.py --step-num=0 --combiner
>> --input_file input_multi_sample.json --month 01', '-reducer',
>> '/bitly/local/bin/python test_unify_one_love.py --step-num=0 --reducer
>> --input_file input_multi_sample.json --month 01']
>> Scanning logs for probable cause of failure
>> Traceback (most recent call last):
>> File "test_unify_one_love.py", line 445, in <module>
>> HadoopAudienceAnalysis.run()
>> File "/bitly/local/lib/python2.5/site-packages/mrjob/job.py", line 545,
>> in run
>> mr_job.execute()
>> File "/bitly/local/lib/python2.5/site-packages/mrjob/job.py", line 561,
>> in execute
>> self.run_job()
>> File "/bitly/local/lib/python2.5/site-packages/mrjob/job.py", line 631,
>> in run_job
>> runner.run()
>> File "/bitly/local/lib/python2.5/site-packages/mrjob/runner.py", line
>> 490, in run
>> self._run()
>> File "/bitly/local/lib/python2.5/site-packages/mrjob/hadoop.py", line
>> 246, in _run
>> self._run_job_in_hadoop()
>> File "/bitly/local/lib/python2.5/site-packages/mrjob/hadoop.py", line
>> 449, in _run_job_in_hadoop
>> raise Exception(msg)
>> Exception: Job failed with return code 1: ['/usr/bin/hadoop', 'jar',
>> '/usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.3.jar',
>> '-files',
>> 'hdfs:///user/mrwoof/tmp/mrjob/test_unify_one_love.mrwoof.20130416.144420.430216/files/test_unify_one_love.py#test_unify_one_love.py,hdfs:///user/mrwoof/tmp/mrjob/test_unify_one_love.mrwoof.20130416.144420.430216/files/input_multi_sample.json#input_multi_sample.json',
>> '-archives',
>> 'hdfs:///user/mrwoof/tmp/mrjob/test_unify_one_love.mrwoof.20130416.144420.430216/files/mrjob.tar.gz#mrjob.tar.gz',
>> '-libjars', '/home/mrwoof/oddjob-1.0.1-standalone.jar', '-D',
>> 'mapred.job.name=audience-analysis-test-2013-01', '-D',
>> 'mapred.output.compress=true', '-D',
>> 'mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec',
>> '-D', 'mapred.reduce.tasks=120', '-cmdenv', 'PYTHONPATH=mrjob.tar.gz',
>> '-outputformat', 'oddjob.MultipleJSONOutputFormat', '-input',
>> 'hdfs:///user/mrwoof/keyword_clicks.stream06_ec2.2013-02-01_00.10.log.gz',
>> '-output', 'hdfs:///user/mrwoof/test-2013-01', '-mapper',
>> '/bitly/local/bin/python test_unify_one_love.py --step-num=0 --mapper
>> --input_file input_multi_sample.json --month 01', '-combiner',
>> '/bitly/local/bin/python test_unify_one_love.py --step-num=0 --combiner
>> --input_file input_multi_sample.json --month 01', '-reducer',
>> '/bitly/local/bin/python test_unify_one_love.py --step-num=0 --reducer
>> --input_file input_multi_sample.json --month 01']
>>
>>
>> % cat /etc/mrjob.conf
>> runners:
>> hadoop:
>> hadoop_bin: /usr/bin/hadoop
>> hadoop_home: /usr/lib/hadoop
>> hadoop_streaming_jar:
>> /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.3.jar
>> python_bin: /bitly/local/bin/python
>>
>>
>>
>> On Mon, Apr 15, 2013 at 7:09 PM, Jim Blomo <jbl...@yelp.com> wrote:
>>>
>>> Hi, bumped the dependencies and created this jar. Please give it a try.
>>> > --
>>> > You received this message because you are subscribed to the Google
>>> > Groups
>>> > "mrjob" group.
>>> > To unsubscribe from this group and stop receiving emails from it, send
>>> > an
>>> > email to mrjob+un...@googlegroups.com.
>>> > For more options, visit https://groups.google.com/groups/opt_out.
>>> >
>>> >
>>
>>
>>
>>
>> --
>> _____________________
>> michael richman
>> sys arch
>> m...@bitly.com
>
>
>
>
> --
> _____________________
> michael richman
> sys arch
> m...@bitly.com

Michael @ Bitly

unread,
Apr 16, 2013, 5:56:10 PM4/16/13
to mr...@googlegroups.com, Michael Richman
Never having done any clojure development, I haven't the foggiest idea where to start here. Not to mention the unclear versions of hadoop (2.0.0 seems to map to 0.23). Any pointers on approach? Also, there are mentions in the Hadoop docs of MultipleTextOutputFormat being deprecated in favor of MultipleOutputs.

Jim Blomo

unread,
Apr 16, 2013, 9:05:54 PM4/16/13
to mr...@googlegroups.com, Michael Richman
Sure, there's a lot of moving parts, but the oddjob code itself is
hopefully straightforward.

1. Understand Input/Output formats, I wrote an intro at
http://www.infoq.com/articles/HadoopOutputFormat
2. Understand how oddjob tweaks the output formats:
https://github.com/Yelp/oddjob/blob/master/src/oddjob/MultipleTextOutputFormatByKey.clj
is the Clojure translation of the first example in the article
3. Understand how Hadoop changed the streaming capabilities (eg. is
-libjar even still used?)
4. Understand how Hadoop changed the Inout/Output Format classes.
Maybe they are deprecated, but do they at least still work?
5. Download the oddjob code and checkout the clojure-1.5.1 branch (for
some reason this branch is not showing up in the UI, but it should be
available via git)
6. use lein version 2 to build (lein uberjar) and test (lein test)
7. update the :dev :dependencies to use the Hadoop 2.0 jar (this may
require some work with maven)
8. Modify the code to work with the changes in #3, #4
9. Modify tests
10. Pull Request

Michael @ Bitly

unread,
Apr 17, 2013, 12:27:11 PM4/17/13
to mr...@googlegroups.com, Michael Richman
Ok, I had already done the vast majority of that work in my troubleshooting.

I've built it with the actual hadoop-streaming JAR that we're using as a local dependency, but I'm still getting failure. 

LEIN_VERSION="2.1.3"

hadoopcron02:~/oddjob-master % cat project.clj
(defproject oddjob "1.0.0-test"
  :description "Hadoop utilities for MrJob that must run in the JVM"
  :dependencies [[org.clojure/clojure "1.5.1"]
                 [org.clojure/data.json "0.1.2"]
                 [local/hadoop-streaming "2.0.0-mr1-cdh4.1.3"]
                 [org.clojure/data.csv "0.1.0"]]
  :dev-dependencies [[org.apache.hadoop/hadoop-streaming "0.20.2"]]
  :repositories {"project" "file:repo"}
  :aot :all)
hadoopcron02:~/oddjob-master % ~/lein uberjar
Created /home/mr/oddjob-master/target/oddjob-1.0.0-test.jar
Including oddjob-1.0.0-test.jar
Including data.csv-0.1.0.jar
Including hadoop-streaming-2.0.0-mr1-cdh4.1.3.jar
Including data.json-0.1.2.jar
Including clojure-1.5.1.jar
Created /home/mr/oddjob-master/target/oddjob-1.0.0-test-standalone.jar

With that I still get the same error.

-libjars is still supported.

MultipleTextOutputFormat still works. I was able to set that with HADOOP_OUTPUT_FORMAT = "org.apache.hadoop.mapred.lib.MultipleTextOutputFormat"

Suggestions?

Jim Blomo

unread,
Apr 22, 2013, 8:09:13 PM4/22/13
to mr...@googlegroups.com, Michael Richman
Hi Michael, too bad that didn't work :/

Going off information in
https://groups.google.com/forum/?fromgroups=#!topic/clojure/F3ERon6Fye0
, I think there are two options:

1. Modify Hadoop to try setting the class loader as prescribed to
correct for the presumed inconsistency. (If that works, I'd post to
the clojure list, too, bumping the priority of fixing this
"confusion"). I think you'll want to modify JobConf.getOutputFormat,
which is where the class get initialized.

2. Rewrite the class you need in Java. I originally wrote these
classes in clojure for concision and to easily use the lein build
tool. But the logic for the actual classes is straightforward. You
can duplicate the logic in Java, build a jar, and use that instead of
oddjob.

Let us know how it turns out!
Reply all
Reply to author
Forward
0 new messages