Sample Word Count Problem using Rhipe

759 views
Skip to first unread message

Shekhar

unread,
Apr 20, 2011, 5:58:37 AM4/20/11
to Bangalore R Users - BRU
Hi,
Here we try to write a simple application of counting the number of
words in a file using Rhipe.

Some basic things:

1. Input file(s) should be kept on HDFS( Hadoop Distributed File
System)
2. Output file(s) will be generated on HDFS
3. Map and reduce functionality should be an R expression
4.Key-value pairs coming from the Hadoop to our map expression gets
mapped to map.keys and map.values respectively.
5. After the map phase, we will be emitting keys and values through
the rhipe function rhcollect(), will get mapped to reduce.key and
reduce.values in the reducer phase.
6. map.values and reduce.values will always be in the form of R list.
7. You can specify the hadoop related parameter like
mapred.job.tracker='local', number of reduce tasks to run or number of
map tasks to run, etc in the "mapred" list.


Advance things( Can ignore as of now)
1. Using combiner ( Doing local reduction) i.e. performing the reducer
operation locally on the same node where task is running, before
finally sending the key/value pairs to the final reducer.
This is helpful in increasing the performance of the MapReduce job
when running on cluster

2. using rhipe_map_buff_size: The default value is 10,000. Which means
that each map task will server 10,000 key-value pairs. Sometimes it
could be a time consuming. You can change it to smaller value.
Typically for large data sets you should set it around 15-20.

3. Using partitioner: If you are trying to generate more than one
output files and you want that same keys should go to same file, then
you can use partitioner to achieve this.

4. try/catch block: Useful for debugging.

5.Using rhstatus: Hadoop will kill the task if it is running for more
than 600 seconds i.e. 10 minutes. And if you are running a
computational intensive job, then it is possible for the task to run
for more than 10minutes. So to avoid killing of the task you can use
either rhstatus or can set the parameter mapred.task.timeout=0 in the
mapred list discussed above

6.Using rhcounter :
you can use this for profiling your application.

Now lets get on solving the word count problem

Now consider we have two files and each file contains some words.
Words may or may not be unique. To make the problem more simple, we
assume that each line consist of only one word.

-----------------------
Approach
-----------------------

Now as we know ,to run the job on hadoop cluster we need map and
reducer functionality.
In R we need to provide expressin for map and reduce functionlity

Since our job is to count the number of words from both of the files
and add the count if there are same words appearing, right?

-----------------------------------------
Map Expression
----------------------------------------
As i already told you that the key-value pairs that will come to our
map expression from hadoop will be in the form of R list.
Since the map.values will be in list, so first we will unlist them.
After unlisting the map.values we will get vector. After getting
vector, you just iterate through the vector, and for every word (don't
worry whether the coming words are same or not, it will be dealt by
the reducer) emit the word as your new key and value as 1(one).

---------------------------------
Reduce Expression
-------------------------------------
Since the mapper phase you have blindly emitted the word as the key
and the value as 1. So the reducer will get all similar keys in
reduce.key and their values in reduce.values as list.
(Remember after map phase shuffling and sorting takes place in Hadoop)

So for each reduce.key you have to unlist its corresponding value and
add them up. And finally emit this reduce.key and the total as the
final key value pair which will be written to the output file.
------------------------------------------------------------------------------------------------------------
WordCount.R
--------------------------------------------------------------------------------------------------------------
library(Rhipe)

map<-expression({
words<-unlist(map.values)
lapply(words,function(i) rhcollect(i,1))
})


reduce<-expression(
pre={total=0},
reduce={total<-sum(total,unlist(reduce.values))},
post={rhcollect(reduce.key,total)}
)

mapred<-list(rhipe_map_buff_size=20,mapred.job.tracker='local')

job_object<-
rhmr(map=map,reduce=reduce,inout=c("text","sequence"),ifolder="/
sample",ofolder="/output_01",mapred=mapred,jobname="word_count")

rhex(job_object)


Some important things to note:

1. mapred.job.tracker='local'. if you set this parameter, the job will
run on single node. its better to test first on local mode before
running on cluster

2.inout=c("text","sequence"). The first parameter specifies the type
of the input file we are giving to our job and the second parameter
which type of file will be generated as an output.
There are three variations in this. You can specify as sequence, text
or map.

3. Functionality of "rhmr": This function encapsulated the map and
reduce expression along with other Hadoop related parameter and forms
a job object.

The "rhex" function will take this job object and submit the job to
Hadoop.

The output of the above program will look like as follows:
[[1]]
[[1]][[1]]
[1] "hi"

[[1]][[2]]
[1] 2


[[2]]
[[2]][[1]]
[1] "om"

[[2]][[2]]
[1] 1


[[3]]
[[3]][[1]]
[1] "on"

[[3]][[2]]
[1] 1


[[4]]
[[4]][[1]]
[1] "are"

[[4]][[2]]
[1] 1


[[5]]
[[5]][[1]]
[1] "can"

[[5]][[2]]
[1] 2


Hope now you have got how to run a job on Hadoop using Rhipe package.

Regards,
Som Shekhar









Shekhar

unread,
Apr 20, 2011, 8:50:24 AM4/20/11
to Bangalore R Users - BRU
Hi All,

In the above post we took a very simple problem where there is only
single word per line. Lets remove this restriction and took a real
scenario, where we have 1000 of files (Example: You can use Gutenberg
data set: The command for downloading gutenberg data is:
wget -w 2 -m http://www.gutenberg.org/robot/harvest?filetypes[]=txt&langs[]=en
This will download all the data set which are in english.)

Problem in the above post is very easy, since we need to deal with
single word only. And since the values comes to the map expression in
the form of R list, we just need to unlist them to form vectors and
simply iterate.
Now how do we do if we have lot of words in a single line itself.First
we should consider what should be our delimiter, like space, :,@
etc..Now usually if it is a plane text then we can take space as our
delimiter, but if it is xml file then we need to choose something
else..

What next??
map.values are in list, so we need to do the following things:
1. unlist the map.values
2.Split the string
3.Unlist them again to form a unified vector.

You might be wondering one time unlisting will form a vector, then why
we are doing for the second time.To understand this lets do a small
activity:
For simplicity assume that we following lines in our file:

sachin ramesh tendulkar
Paul cohelo
ken follet
Jeffery Archer

Now as you know they came to our map expression as list. So for this
open the R console and first make a R list out of these strings.

> str1<-"sachin ramesh tendulkar"
> str2<-"Paul cohelo"
> str3<-"ken follet"
> str4<-"Jeffery Archer"
> myList<-list(str1,str2,str3,str4)# Formed a list of four strings
> myList
[[1]]
[1] "sachin ramesh tendulkar"

[[2]]
[1] "Paul cohelo"

[[3]]
[1] "ken follet"

[[4]]
[1] "Jeffery Archer"

> Doing_Unlist<-unlist(myList)# Doing unlist will form the vector
> Doing_Unlist
[1] "sachin ramesh tendulkar" "Paul cohelo"
[3] "ken follet" "Jeffery Archer"
> strsplit(Doing_Unlist," ")# We are splitting the string with space as
> delimiter. again it forms list
[[1]]
[1] "sachin" "ramesh" "tendulkar"

[[2]]
[1] "Paul" "cohelo"

[[3]]
[1] "ken" "follet"

[[4]]
[1] "Jeffery" "Archer"

> unlist(strsplit(Doing_Unlist," "))# so again unlisting and we get a
> proper vector
[1] "sachin" "ramesh" "tendulkar" "Paul" "cohelo"
"ken"
[7] "follet" "Jeffery" "Archer"
>

Now this is the only logic applied.Now the R code is pretty straight
forward

-----------------------------------------------
Generic_WordCount.R
------------------------------------------------
library(Rhipe)

map<-expression({
words_vector<-unlist(strsplit(unlist(map.values)," "))
lapply(words_vector,function(i) rhcollect(i,1))
})


reduce<-expression(
pre={total=0},
reduce={total<-sum(total,unlist(reduce.values))},
post={rhcollect(reduce.key,total)}
)

mapred<-list(rhipe_map_buff_size=20,mapred.job.tracker='local')

job_object<-
rhmr(map=map,reduce=reduce,inout=c("text","sequence"),ifolder="/
sample_1",ofolder="/output_02",mapred=mapred,jobname="word_count")

rhex(job_object)

you can run the code and can verify the results.

There are some flaws in this code:

(1)Since delimiter is space, so assume if you have a sentence like
this "time, is very precious, so we need to utilize this precious time
because time once gone will never come back."
What do you think what should be the count of word "time" and
"precious"?
Answer: The count for word "time" will not be 3 it will be 2. and the
count for word precious will be 1. Since there are will be two keys
for time Key1---"time"
Key2---"time,"----> Notice the comma after the word time Similarly for
the word "precious".

(2) More over we are using single space as delimiter, so if in between
the words if we have more than one space, then the extra space will be
treated as word.(Check this out)

I guess you are getting the idea how we are exploiting the R
capabilities on distributed plaform like Hadoop.

Now one can use of the various hadoop related parameters here like
combiner, partitioner, num of reduce tasks, num of map tasks. Using
this on pseudo mode wont have any that much of effect on performance.

In the coming posts we will try to solve real world problems like Data
mining, k-means clustering, generating mandelbrot set, monte carlo
simulation etc. We will also use the graphics in k-means clustering
algorithm to see how the centers are moving in every iteration
Enjoy !!!

Regards,
Som Shekhar

Shekhar

unread,
Apr 20, 2011, 8:36:16 AM4/20/11
to Bangalore R Users - BRU
lapply(words_vector,function(i) rhcollect(i,1))
})


reduce<-expression(
pre={total=0},
reduce={total<-sum(total,unlist(reduce.values))},
post={rhcollect(reduce.key,total)}
)

mapred<-list(rhipe_map_buff_size=20,mapred.job.tracker='local')

job_object<-
rhmr(map=map,reduce=reduce,inout=c("text","sequence"),ifolder="/
Reply all
Reply to author
Forward
0 new messages