ETL Telco XML data using Hadoop/Cascading

69 views
Skip to first unread message

Julian Reyes

unread,
Nov 12, 2015, 2:33:36 PM11/12/15
to cascading-user
Hello,

I am newbie to Hadoop world and I have a project I would like to improve by using Hadoop/Pig, Hive or Cascading, I do not know which one is better and faster.

The idea would be to retrieve data from several sources that will give us a compressed file (tar.gz) with several XML files within it.
I would like to retrieve the data and unpack it in parallel, then by using for example Pig and my own UDF I guess I could get the information I am interested in from the XML files. 
It will be needed to merge all outputs into one and load it on to a Oracle table, perhaps using Sqoop.

Also each compressed file has several XML, but one XML file has information that currently is used to open others XML files, for example:

HHL0250.tar.gz :
                        - objects.xml -> has information and also has a field that can be used to open 250.xml like an Id
                        - 250.xml
                        ..
                        - 299.xml

HHL0300.tar.gz :
                        - objects.xml -> has information and also has a field that can be used to open 300.xml like an Id
                        - 300.xml
                        ..
                        - 349.xml

I wonder if it is possible to loop over objects.xml using pig and load each file 100.xml, etc.. I will need also to parse objects.xml and load it on to other table.

This data will be incremented in daily basis so I would need to get some scalability.

Currently this ETL is done by using scripts in groovy and shell, looping over the sources we have to connect to, extract each file, parse it and load it into the database, where for each iteration we need to load into memory the previous loaded data to keep some consistency and append the next chunk of information from next sources..

Also I would like to pull the data currently in the database on to a file and append the output of ETL process to this file.

I think that perhaps by using hadoop can speed up the process as there could be 4~5 sources, each one contains more than 30 xml files with more than 50MB each, overall it takes around 10 hours and I would like to reduce it to 2~3 hours maximum.

Ken Krugler

unread,
Nov 12, 2015, 6:43:53 PM11/12/15
to cascadi...@googlegroups.com
Hi Julian,

Processing the XML data with Cascading, and then generating files that Sqoop can push to Oracle, is relatively straightforward.

The interesting part is how to handle the xxx.tar.gz format in Hadoop, since the tar format isn't directly supported.

I'd probably write one Flow that processes each of the main 4-5 files in parallel, and explodes/writes the XML files to HDFS - it could also output information from objects.xml needed for subsequent processing.

Then a second job processes each of the resulting 150+ XML files in parallel, or as parallel as your cluster supports.

-- Ken

PS - One way to handle the first Flow is to create an input file that contains the paths (in HDFS) to each of these 4-5 files. You could create a ListScheme (I've got one lying around somewhere), but a quick hack is to pad each line out to something like 1K (for example) with spaces. Then use the regular TextLine Scheme in Cascading, but set the Hadoop split size to 1K.

Each map task will get one line of the file for input. At this point you'd write a custom Cascading Function which would open an input stream on that file, and run the stream through the Java GZIPInputStream, and then the Apache Commons Compress TarInputFormat, and stream out the tar files.

PPS - There's the TarFileSystem, but your requirement to get info from objects.xml in order to know what/how to process other XML files inside the tarball preclude that, I believe.


From: Julian Reyes

Sent: November 12, 2015 11:33:35am PST

To: cascading-user

Subject: ETL Telco XML data using Hadoop/Cascading


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/05aa56e3-3fe6-4acb-8dab-235d72ddacb7%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr







--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Julian Reyes

unread,
Nov 13, 2015, 5:03:33 AM11/13/15
to cascading-user

Hi Ken,

Thanks for your reply.

Currently we are connecting to external sources in order to get those xxx.tar.gz, its basically a shell script that connects to the external source, tar czf the XML files (objects.xml and 100.xml .. ) together, then SCP the tar.gz back to our system, we extract the information in a folder by using tar xzf, I guess then we can copy that folder into HDFS ? and start a job as soon as we get the first chunk of XML files ?

Sometimes there could be connectivity issues when retrieving the tar.gz from the sources, we have sort of a function that retries up to 3 times, if it fails it aborts and try later the process as its crucial to have all files from sources.

XML files are not well structured and quite big, so we need to kinda parse the files and get the information we need to create different outputs, using cascading is it possible to parse the XML files? I have seen quite a lot of Java examples but not Groovy, I do not know if it is supported?

Thanks.

Ken Krugler

unread,
Nov 13, 2015, 11:09:00 AM11/13/15
to cascadi...@googlegroups.com
Hi Julian,


From: Julian Reyes

Sent: November 13, 2015 2:03:33am PST

To: cascading-user

Subject: Re: ETL Telco XML data using Hadoop/Cascading



Hi Ken,

Thanks for your reply.

Currently we are connecting to external sources in order to get those xxx.tar.gz, its basically a shell script that connects to the external source, tar czf the XML files (objects.xml and 100.xml .. ) together, then SCP the tar.gz back to our system, we extract the information in a folder by using tar xzf, I guess then we can copy that folder into HDFS ? and start a job as soon as we get the first chunk of XML files ?

Yes, as long as you've got enough data with each file to justify the overhead of starting up a job. E.g. doesn't make sense if you only have 10MB, definitely for 10GB, if you're somewhere in between I'd say first run it as a batch once you've got all files, then figure out whether to make it more complex.

But why tar then untar? Seems faster to copy individual files, and push those to HDFS as you get them.

Sometimes there could be connectivity issues when retrieving the tar.gz from the sources, we have sort of a function that retries up to 3 times, if it fails it aborts and try later the process as its crucial to have all files from sources.

XML files are not well structured and quite big, so we need to kinda parse the files and get the information we need to create different outputs, using cascading is it possible to parse the XML files? I have seen quite a lot of Java examples but not Groovy, I do not know if it is supported?

You can parse XML files, though you'd want to use a SAX pull parser (so it's incremental, versus reading the entire thing into memory to build a DOM).

Not sure why you mention Groovy, Cascading is written in Java. There's a lot of talk about this "Scalding" thing (Scala binding for Cascading) on the list, I hear it's cool :)

Odd issue for Hadoop is ensuring that when a file is split, it gets split on an appropriate XML entity boundary, so you don't get part of what you need in one map task, and the rest of it in another (normally Hadoop splits on line breaks at the target block size position, which these days is typically 256MB)

There's an XMLInputFormat floating around which handles that for you, I've wrapped that before so it's usable in Cascading…maybe others know of a public release of that which is ready to use.

-- Ken

Julian Reyes

unread,
Nov 14, 2015, 1:25:11 PM11/14/15
to cascading-user
Hi, 

I know this is a group for cascading users but I was just trying to get started in Hadoop and using Pig to see if I got it right, apologies if only cascading questions are allowed.

I am getting problems while reading the XML but seems I am doing something wrong...

My XML looks like the following (of course, its much bigger, I just added first entries):

<cn:bulkCmConfigDataFile xmlns:cn="details-CONFIG" xmlns:xt="nrmBase" xmlns:en="CLL-NB">
<cn:fileHeader fileFormatVersion="2.0.0" senderName="senderName" vendorName="vendorName"/>
<cn:configData>
<en:ManagementNode xmlns:en="CLL-NB">
<en:neGroup>Group_1</en:neGroup>
<en:neVersion>2.1.0</en:neVersion>
<en:neId>100</en:neId>
<en:neName>TK0005</en:neName>
<en:neIp>192.168.0.2</en:neIp>
</en:ManagementNode>
<en:ManagementNode xmlns:en="CLL-NB">
<en:neGroup>Group_1</en:neGroup>
<en:neVersion>2.1.0</en:neVersion>
<en:neId>101</en:neId>
<en:neName>TK0002</en:neName>
<en:neIp>192.168.0.3</en:neIp>
</en:ManagementNode>
</cn:configData>
<cn:fileFooter dateTime="2013-12-20T03:40:15+00:00"/>
</cn:bulkCmConfigDataFile>

And the Pig script I am trying to use is the following:


set pig.splitCombination false;
set tez.grouping.min-size 5242880;
set tez.grouping.max-size 5242880;

register '/usr/lib/tez/tez-0.7.0/tez-tfile-parser-0.7.0.jar';

DEFINE getDetails(raw) RETURNS void {
        details = FOREACH raw GENERATE configData;
        distinctDetails = DISTINCT details;
        STORE distinctDetails INTO '$DETAILS' USING PigStorage(',');;
}


rmf $NODE_DETAILS
rawLogs = load '/user/hduser/test/test01/ManagementNode.xml' using org.apache.tez.tools.TFileLoader() as (configData:chararray, key:chararray, line:chararray);
raw = FOREACH rawLogs GENERATE ManagementNode,key,line;

getDetails(raw);
exec;

However, I am getting the following error:

ERROR 2998: Unhandled internal error. null

java.lang.StackOverflowError
        at org.apache.tez.tools.TFileLoader.hashCode(TFileLoader.java:148)
        at java.util.Arrays.hashCode(Arrays.java:3140)
...

Could it be because of the XML file? 

Thanks.

Ken Krugler

unread,
Nov 14, 2015, 2:02:23 PM11/14/15
to cascadi...@googlegroups.com
Hi Julian,

You're right that this list is for Cascading - I'd suggest trying the Pig mailing list for your question below.

Regards,

-- Ken


From: Julian Reyes

Sent: November 14, 2015 10:25:10am PST

JOHN MILLER

unread,
May 17, 2016, 9:03:14 AM5/17/16
to cascading-user
GreetingsI am having a similar problem running "select distinct (fieldname) from table"   Upon execution, the job just "hangs" there  Below is the output from my hive query

Please advise on how to resolve this problem (Same thing happens when i try running cascading-hive)

John M

hive> select count(*) from commoncrawl18;
Query ID = jmill383_20160517082328_d80b48c0-5382-4499-b574-ab6d0ab80fcd
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1463423258231_0003, Tracking URL = http://starchild.ltsnet.net:8088/proxy/application_1463423258231_0003/
Kill Command = /opt/hadoop/bin/hadoop job  -kill job_1463423258231_0003
Interrupting... Be patient, this might take some time.
Press Ctrl+C again to kill JVM
killing job with: job_1463423258231_0003
Hadoop job information for Stage-1: number of mappers: 0; number of reducers: 0
2016-05-17 08:48:08,833 Stage-1 map = 0%,  reduce = 0%
Ended Job = job_1463423258231_0003 with errors
Error during job, obtaining debugging information...
Job Tracking URL: http://starchild.ltsnet.net:8088/cluster/app/application_1463423258231_0003
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched:
Stage-Stage-1:  HDFS Read: 0 HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 0 msec
hive>

Andre Kelpe

unread,
May 23, 2016, 5:40:51 AM5/23/16
to cascading-user
You should ask on the hive user mailing list for help. This group is
discussing Cascading related topics:

http://hive.apache.org/mailing_lists.html

- André
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cascading-use...@googlegroups.com.
> To post to this group, send email to cascadi...@googlegroups.com.
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/e3638a25-1065-461e-88e1-5ed14d6c1829%40googlegroups.com.
>
> For more options, visit https://groups.google.com/d/optout.



--
André Kelpe
an...@concurrentinc.com
http://concurrentinc.com
Reply all
Reply to author
Forward
0 new messages