As we have seen the installation of Rhipe and did also some sample
programs. So now lets move on to the real practical application where
we have huge amount of raw data and we want to do some data mining to
extract meaning out of the data.
For this purpose i am using wikipedia trafffic data set and the
problem statement is
(Problem )Find the top 10 sites of the english project out of one week
of the data?
In order to appreciate the problem and using R in this , we first must
understand that the data size which we are now dealing with is not of
few MBs(MegaBytes) or GBs(GigaBytes), it is the order of
TBs(TeraBytes) or PBs(Petabytes). So to handle this much of data and
doing computation is not feasible on single system. That's why we are
moving to Hadoop cluster to solve this problem, where the framework
provides us the distributed file system where in each and every node
has the access to the data as if they are accessing from their own
hard disk.
Dealing with unstructured data efficiently is another big problem. R
which has rich statistical functions provides us the efficient way to
deal with unstructured data.For example we can make use of data frames
and matrices.
Now lets get on to solve our problem. Before that we shall see how the
Wikipedia data set looks like:
You can download the Wikipedia hourly traffic data set from
http://dammit.lt/wikistats/ .
Each file in the data set represents an hourly data on a particular
date which is easily identifiable from the name of the file. The
attributes filename, data and time are separated by “-“character in
the name of the file.
For example: The file pagecounts-20101222-080000 has the file name as
“pagecounts”, data is 22 December 2010 (format is yyyy/mm/dd) and the
time is 8am. This file consists of the Wikipedia traffic between 8 am
to 9am.
Each record in the file consists of four fields separated by a space
in the following order
A. Project name : It represents the language. For
example: ‘en’ represents English
B. Page name : It represents the name of the site
viewed
C. Page counts/ views : Number of times the page has been viewed
in that hour.
D. Page Size : Size of the page being viewed.
A sample data set will look like as
zu IRashiya 1 13731
zu ISpaniya 1 13230
zu ISwidi 1 44820
zu IYurophu 1 45611
zu Ikhasi_Elikhulu 6 121138
zu Inja 2 47532
zu Inkalankala 1 5337
zu Isi-Lithuanian 1 32181
zu IsiKotilandi 1 10254
zu IsiRashiya 1 12767
zu Izilimi_zaseNingizimu_Afrika 1 21021
zu MediaWiki:Ipb_expiry_invalid/ 1 14441
As per the Wikipedia hourly traffic data set, the page name will be
unique in a single file. But a page name might appear in another
hourly wiki file. So if a particular page name appears in more than
one file then we need to add the page views for that page name or
site.
Hence minimum of two map reduce jobs are required to solve the
problem. The first map reduce job is responsible for extracting those
sites who has got "en" project and if it appears in more than file
then we need to add their page counts.
The second map reduce jobs will take the output of the first map
reduce job and then it will sort them in the descending order of their
page views
The details of both the map reduce are as follows:
----------------------------------------
First Map Reduce Job:
-----------------------------------------
In this job, the map function will emit the page name as the key and
the page view as the value. In the reducer part the page views will
be added and the total sum of the page views will be emitted as the
value for the key i.e. page name.
The R script for the same is as follows:
rhinit()
map<-expression({
y<-do.call("rbind",lapply(map.values,function(r){
strsplit(r," ")[[1]]
}))
y<-y[y[ ,1] == "en", ,drop=FALSE]
if(nrow(y) > 0)
{
for( i in 1:nrow(y))
{
x<-y[i, ]
key<-x[2]
value<-as.numeric(x[3])
rhcollect(key,value)}
}
})
reduce<-expression(
pre = { sums=0 } ,
reduce = {
sums <- sum(sums,unlist(reduce.values))
},
post = {
rhcollect(reduce.key,sums)
}
)
mapred<-
list(rhipe_map_buff_size=20,mapred.task.timeout=0,mapred.reduce.tasks=2)
ifolder="wikistat_sample"
ofolder="/out_multi_01"z<-
rhmr(map=map,reduce=reduce,inout=c("text","sequence"),ifolder=ifolder,
ofolder=ofolder,mapred=mapred,jobname="DataMining")
rhex(z)
---------------------EXPLANATION-------------------------------------
Now as we know that the key-value pairs coming from Hadoop will be
received by the map expression in the form of R list. So in the map
expression we are basically doing the following things:
(1) Splitting the strings in the list and forming a matrix
(2) Extracting only those rows where we find english projects
(3) The matrix formed from the above step (step 2) will consist of
rows having english projects.
(4) We will iterate through each of the rows and emit the page name as
the key and the corresponding page count as the value.
In the reducer part,since we can have more than one similar
keys( because we are using multiple files), so we are adding them in
the reducer part.\
Note that we are running two reducers (map.reduce.tasks=2), so two
output files will be generated.
-------------------------------------------
SECOND MAP REDUCE JOB
------------------------------------------
This job sorts the page name in the order of the decreasing value of
its page views and aggregates the result in a matrix form. In the
reducer part the number of the page names (i.e. top 10 or top 20 etc)
can be specified to be written to the output file.
The R script for the same is as follows:
library(Rhipe)
rhinit()
map<-expression({
y<-do.call("rbind",lapply(seq_along(map.values),function(i) x<-
c(map.keys[[i]],map.values[[i]])))
rhcollect("en",y)
})
reduce<-expression(
pre = { collect<-NULL } ,
reduce = {
collect<-bind(collect,do.call("rbind",reduce.values))
collect<-collect[order(as.numeric(collect[,
2]),decreasing=TRUE), ]
},
post = {
collect<-collect[1:min(nrow(collect),10),] # specify here
for top 10 or 20 etc
rhcollect(reduce.key,collect)
}
)
mapred = list(rhipe_map_buff_size=10,mapred.task.timeout=0)
ifolder = "/out_multi"
ofolder = "/out3_multi"
z<-rhmr(map
=map,reduce=reduce,combiner=TRUE,inout=c("sequence","sequence"),ifolder
= ifolder,ofolder = ofolder,mapred =mapred,jobname="DataSort")
rhex(z)
-----------------------------
EXPALANATION------------------------------
Here we are using the output files generated by the first map reduce
jobs as input files to this job.
In the map expression we are forming a matrix out of map.keys and
map.values and sending the "en" as the keys ( it is not important) and
the entire matrix as the value
In the reducer part i am ordering the matrix in the descending order
and posting only the top 10 values.
----------------Output---------------------
The ooutput for two Wikipedia files( pagecounts-20101222-080000 and
pagecounts-20101222-090000 ) will be :
> rhread("/out3_multi/part-r-00000")
RHIPE: Read 1 pair occupying 350 bytes, deserializing
[[1]]
[[1]][[1]]
[1] "en"
[[1]][[2]]
[,1] [,2]
[1,] "Special:Random" "301718"
[2,] "Main_Page" "289006"
[3,] "Special:Search" "269146"
[4,] "Special:Export/Robert_L._Bradley%2c_Jr" "144681"
[5,] "Special:Export/William_Kurtz_Wimsatt%2c_Jr" "137038"
[6,] "404_error" "40456"
[7,] "File:Hardy_boys_cover_09.jpg" "35015"
[8,] "Teena_Marie" "14818"
[9,] "Rogers%27_Rangers" "13439"
[10,] "w/index.php" "7031"
So as you can see the special:random page has been hit most in these
hours (8am to 10am).
And you can also observe that most of the people are trying to find
what 404 error (page not found error).
Enjoy :-)
Regards,
Som Shekhar