Pangool and CDH4.3 MRv1 Issue

45 views
Skip to first unread message

Alexei Perelighin

unread,
Jul 31, 2013, 12:18:25 PM7/31/13
to pangoo...@googlegroups.com
Hi,

I have used successfully  Pangool with CDH3 and now I am looking into migrating to the CDH4.3 MRv1. We will not be using MRv2/YARN as Cloudera does not recommend it for production.
But I am running into following issue:
java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected
at com.datasalt.pangool.tuplemr.mapred.lib.output.ProxyOutputFormat.createOutputFormatIfNeeded(ProxyOutputFormat.java:91)
at com.datasalt.pangool.tuplemr.mapred.lib.output.ProxyOutputFormat.checkOutputSpecs(ProxyOutputFormat.java:81)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:985)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:946)

My pom.xml has following dependencies:
    </dependency>
  <dependency>
  <groupId>com.datasalt.pangool</groupId>
  <artifactId>pangool-core</artifactId>
  <version>0.60.3</version>
  </dependency> 
  <dependency>            
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.0.0-mr1-cdh4.3.0</version>
</dependency>
                <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.0.0-cdh4.3.0</version>
<type>jar</type>
<optional>false</optional>
</dependency>
</dependencies>
 
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
</project>


I have tried using <classifier>mr2</classifier>, but it did not work.
Have you encountered similar issues?

Thanks,
Alexei

Alexei Perelighin

unread,
Jul 31, 2013, 1:09:35 PM7/31/13
to pangoo...@googlegroups.com
Sorry, managed to solve the issue. It will work with the following dependencies:

<dependency>
  <groupId>com.datasalt.pangool</groupId>
  <artifactId>pangool-core</artifactId>
  <version>0.60.3</version>
                        <classifier>mr2</classifier>
  </dependency> 
  <dependency>            
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
                <dependency>            
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.0.0-mr1-cdh4.3.0</version>
</dependency>
                <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.0.0-cdh4.3.0</version>
<type>jar</type>
<optional>false</optional>
</dependency>

Adding commons-io version 2.4 was the critical bit to make it work with <classifier>mr2</classifier>

Pere Ferrera

unread,
Jul 31, 2013, 1:21:01 PM7/31/13
to pangoo...@googlegroups.com
Hello Alexei,

Yes, CDH4 should work with classifier mr2, I just added a profile to the whole Pangool pom.xml to check that the tests pass.

The error with commons-io you are referring to I just reproduced it , it only happens if you declare Pangool before you declare Hadoop in your dependencies. So there are actually three ways of solving this :-)
- Declare pangool dependency after hadoop 
- Declare pangool before Hadoop but:
 -- Exclude commons-io from pangool 
 -- Or add commons-io explicitly before Hadoop (what you just did).

Hope everything will be fine then, keep us posted!

Cheers,

Iván de Prado

unread,
Jul 31, 2013, 1:31:27 PM7/31/13
to pangoo...@googlegroups.com
Hi Alexei, 

Can we ask how has your experience been with Pangool? Are you happy with it? For which kind of project are you using it? We are just curious :-)

It would be nice if in the future we add a "Powered by Pangool" page.

Regards!
Iván


2013/7/31 Pere Ferrera <ferrera...@gmail.com>

--
Has recibido este mensaje porque estás suscrito al grupo "pangool-user" de Grupos de Google.
Para anular la suscripción a este grupo y dejar de recibir sus correos electrónicos, envía un correo electrónico a pangool-user...@googlegroups.com.
Para obtener más opciones, visita https://groups.google.com/groups/opt_out.
 
 



--
Iván de Prado
CEO & Co-founder

Alexei Perelighin

unread,
Aug 1, 2013, 6:34:32 AM8/1/13
to pangoo...@googlegroups.com
Hi Ivan,

With Pangool usage, at this moment we are moving out of Proof Of Concept stage and into production, for a couple of small modules. And it was quite successful.
My task was to streamline and optimise ingestion of raw logs into our Hive data warehouse. The original process had a lot of steps;
1. raw log files are in UNIX file system
2. Flume - converts raw data into structured data (csv) and performs some decoration
3. Run Oozie workflow that would do extra data enrichment by joining with other data sources (unfortunately requiring Reduce Side Join), deduplication, group by, rollups, etc. Some oozie actions were PIG, some standard Java Map Reduce jobs. Oozie was providing logic branching.

The problems with that process were:
1. Too many different tools and components, nearly impossible to automate the "end to end" testing. Also business logic is spread all over the place and finding issues could be a pain as over the years different developers did different undocumented modifications to configs. No birds eye view of the process;
2. For our purposes Flume was an overhead, slow, with some unexplained issues which required to many workarounds, so everybody wanted to exclude it from the deployment. There were unit tests for separate decorators, but not for the whole data flow;
3. oozie workflow.xmls were becoming to large and with to many branches. They were not unit tested.
4. No single IDE which could be used to jump smoothly from one peace of code to another.

There was a temptation to put everything into Apache PIG, write the Load Functions, UDFs etc, but implementing branching logic with some nested if ... else ... structures could be a pain and would require usage of oozie for that and once again would spread out the business logic which we wanted to avoid.

Doing it all in pure Java Map Reduce would solve lots of issues like:
1. all logic in one place and in one language
2. no restrictions on branching logic within the mappers and reducers
3. Main class can do workflow branching logic
4. easy to unit test components and automate the end to end testing of the workflow

The negative of pure Java approach is that it could be quite painful to implement inner joins, multiple inputs, multiple outputs, passing parameters to the mappers from the Main method, data schemas.
Pangool takes away that pain, in a way it introduces some PIG-ynnes into the Java:
1. All data schemas are in Java classes
2. multiple inputs is a breeze 
3. multiple outputs are built in
4. joins are easy
5. you are not restricted to a paradigm like with PIG or HIVE, it is always possible to access Hadoop API
6. It is easy to use OOP patterns with Pangool and implement data flows
7. Custom serialisation gives you the tools to work with hierarchical data structures which is a pain in PIG and HIVE.

By implementing some classes to manage the temporary directories and determining the success or failure of a map reduce job it was also possible to reduce the amount of logic put into oozie and made it unit testable.
Also had to do some custom input/output formats for unit testing. It would have been easier to do if some of the properties in Pangool classes were not private or had some protected properties.

Now when we run "mvn package" Pangool based workflows are unit tested and it can be done on any machine without requiring deployment of Hadoop stack.
Detecting issues with data became quite simple, just put the input data into the unit test of the workflow and debug on your dev machine :D, if the input is too large, than having error codes which can be traced directly into Java code and the tuple/string which caused it is a life saver. 

May be later I will share some patterns I came up, which had simplified the implementation of new data feeds and the workflow management in Java.

Thanks,
Alexei

Alexei Perelighin

unread,
Aug 1, 2013, 6:46:13 AM8/1/13
to pangoo...@googlegroups.com
Hi Pere,

That had worked. Thanks!!!!

Cheers,
Alexei

Pere Ferrera

unread,
Aug 5, 2013, 5:22:27 AM8/5/13
to pangoo...@googlegroups.com
Hello Alexei,

Very cool to read all this. I'm glad Pangool made sense for your use case, as it seems. Regarding workflow logic, we have been thinking for a long time about some workflow management tool on top of Pangool. We have a prototype called "pangool-flow", we used it internally sometimes, but never released it. It would be cool to know what are your ideas in the matter, lately we have been thinking about the possibility of adding a higher-level abstraction in Clojure for defining a Job flow.

Cheers,
Message has been deleted

Alexei Perelighin

unread,
Aug 5, 2013, 7:06:03 AM8/5/13
to pangoo...@googlegroups.com
Hi Pere,

"pangool-flow" would have been great to have. I am not particularly keen on having extra abstraction layers/languages if it can be avoided.
My implementation of the flow is quite simple:
1. MapReduce Jobs communicate via files between them, output of previous can become inputs for the next ones. All these jobs are run within a TASK which has its own working directory in HDFS, usually somewhere in /temp/pangool/TASK_ID, all outputs go into that directory (/temp/pangool/TASK_ID/job_1, /temp/pangool/TASK_ID/job_2);
2. to abstract away the physical locations from the flow decision code I link string labels to the input/output dirs, while flow is executed all output and intermediate labels are linked to the /temp/pangool/TASK_ID/job_X which are assigned in the API
3. Decision making is done by checking that output dir in /temp associated with a LABEL has _SUCCESS file, than it is a simple if {...} else {..}
4. Some of the Multiple Outputs of the multiple MapReduce Jobs need to be committed on success of the whole TASK, these are linked to the respective LABELS, and API moves the respective output dirs from /temp/pangool/TASK_ID/... into permanent HDFS;
5. there are plenty of intermediate outputs in HDFS which can be removed after completion, usually by removing /temp/pangool/TASK_ID, this is done after the commit;

Example of code:
parameters.put("OUTPUT.SOME", "hdfs://output-success");
parameters.put("FAILURE.SOME", "hdfs://output-failure");
TaskDirectories task = TaskDirectories.create(fs, JOB_CLASS.class.getName(), parameters);
task.link("OUTPUT.SOME", "FAILURE.SOME", FailureSchema.outputFolderName);
Path step_1 = task.getTmpOutputPathFor("OUTPUT.STEP_1"); // it is not part of parameters, so it is only in /temp/pangool/TASK_ID
String[] taskArgs = {date, InputPath,step_1.toString()};
ToolRunner.run(conf, this, taskArgs);
if (task.outputSuccess("OUTPUT.STEP_1")) {
... next step

Path some_output = task.getTmpOutputPathFor("OUTPUT.SOME");
String[] finalArgs = {date, step_1.toString(), some_output.toString()};
ToolRunner.run(conf, new FinalStep(), finalArgs);
... if all is good

task.offerToCommit("FAILURE.SOME");
if (!task.offerToCommit("OUTPUT.SOME")) {
            loggerForMain.error("It was not possible to commit the OUTPUT.SOME);
            throw new Exception("It was not possible to commit the OUTPUT.SSOME");
        }
        if (!task.commit()) {
            loggerForMain.error("Could not commit the results. See logs for details.");
            return 6;
        }
} else {
... erro handling
}

Reply all
Reply to author
Forward
0 new messages