Controlling what happens between map and reduce

101 views
Skip to first unread message

Joaquin Torre

unread,
Dec 10, 2014, 2:49:24 PM12/10/14
to rha...@googlegroups.com

Hello.
My problem is as follows: I have a MapReduce algorithm which consists of a map (giving me the data frame to work with) and a reduce (multiplying rows, etc). It runs (yay!), but whenever I change the number of map jobs (e.g. 30 instead of 20), it gives me slightly different results. However, this does not happen when I run it again with the same parameters and map tasks. This is a huge problem for me.

I know it's not the map function. I know this because I did some testing: I didn't use a reduce function (rather, it didn't do anything) and I "reduced" it outside, in my own R environment, and when I changed the amount of mappers, the result stayed the same. This means that the difference happens in the reduce phase.

I believe this happens because my algorithm is a special case, in which order does matter, because of operations such as multiplication based on values and aggregation. I need map tasks to end and then start reducing, and never reduce more than one time for a given key.

I achieved the former condition by adding mapreduce.job.reduce.slowstart.completedmaps=1.00, however I still see some variation when changing the number of map tasks. I also tried mapreduce.reduce.shuffle.parallelcopies=1 and mapreduce.reduce.speculative=false to no avail.

Details about my job:
the result of the maps gives me a data frame with columns: age(this is the key), amount and several other variables which I operate on. The reduce should take every age group (97 in total) and operate on the other variables. The size is around 300k rows by 7 columns (variables). It has to be reduced to about 97 rows by 7 columns.


Thanks in advance and please tell me if you need more information, I'll happily provide it.

Antonio Piccolboni

unread,
Dec 10, 2014, 3:20:34 PM12/10/14
to RHadoop Google Group
I guess you are not sharing code for some good reason, otherwise you would have done so, but if you can that's important. Otherwise I need to understand you requirement on order. The first reaction is you can't count on implicit order. Not the order of rows in input, not the order of map execution. I am going to share an example, if it catches your question great otherwise please explain why it doesn't with the goal of formulating a small example that does

from.dfs(
  mapreduce(
    to.dfs(mtcars), 
    map = function(k,v) keyval(v$cyl, v), 
    reduce = function(k,v) keyval(k, diff(v$mpg))))

Now that diff relies on the ordering of rows within each group. On a small example, where there is a single map call in a single map task, everything looks fine. Then you start increasing your data size to get to the real data set and things start to go wrong, sometimes clearly and sometimes in a subtle way. In my example there are only 32 rows, but imagine you have a data set with hundreds of partitions, independent files that are read by independent processes (and it doesn't have to be one-to-one between processes and partitions, as you clearly show by manipulating the number of tasks). Each map call happens and writes out, the shuffle phase assembles all the data group by group and then reducers are invoked. The order within each group can be manipulated with a feature called a secondary key which is  not available and not relevant to rmr2. Even if it were, it's an explicit order in the data,  not an implicit one. If you had an additional  field "order", then you'd do something like:

from.dfs(
  mapreduce(
    to.dfs(mtcars), 
    map = function(k,v) keyval(v$cyl, v), 
    reduce = function(k,v) keyval(k, diff(v$mpg[v$order]))))

An alternative explanation could be that we have some problems in group formation. We had more than one bug of this type reported in the past -- and fixed. Please check that the number and size of groups is the one you expect and that it doesn't change when you modify the number of map jobs. Also, replicating the problem on the smallest possible data set is often helpful (but not always easy and not always small). I don't think you understand the slowstart feature the same way I do. My understanding is that it is purely a performance parameter with no effect on the semantics (IO relation) of your program, unless you are using side-effects outside the mapreduce program, of course. I would stop tampering with advanced hadoop settings for now and focus on your program and rmr2 to troubleshoot the problem. Thanks





--
post: rha...@googlegroups.com ||
unsubscribe: rhadoop+u...@googlegroups.com ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups "RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Joaquin Torre

unread,
Dec 10, 2014, 4:12:54 PM12/10/14
to rha...@googlegroups.com
Hi Antonio, thank you very much for your reply.


I can't really share the data I used in the algorithm for privacy reasons but I'll try to come up with good example data.

What I meant with order was that I need things executed in a certain manner. First the map tasks, then the reduce tasks. My reduce tasks need access to all of the data of a certain key (age group), because it multiplies them by the average of all of them and then aggregates them among other operations that include normalization. That's why I only need one reduce call per unique key, because if rhadoop tries to do it on smaller chunks the result is wrong. When I started executing the algorithm with 20maps, I saw that the reduce was already making progress, and I did not want that: that's why I added the slowstart.

My map function parses through records (lines of a single file), and every record contains data of every key. The reduce does operations described above. And given the nature of my calculations, doing operations on smaller sets and then adding them up together doesn't work. 

Do you know how many times a reduce function is called with the same key? In my case I want it to be just one, but I think it's more than that.

I've done some logging with the following line in the reduce function: write.table(data.frame(values),file="/tmp/mapr-debug.log")  (this does not write on Hadoop, rather on the local filesystem)
I ran cat /tmp/mapr-debug.log on every node (around 6). I expected only one file to be created (only one node should have the file, because only one instance should have been called), but I found around four total, each had a piece of information that didn't repeat. Are different reducers getting different inputs with the same key? (or different instances of the same reducer)


Again, thank you very much for your help.

Joaquin

Antonio Piccolboni

unread,
Dec 10, 2014, 4:32:44 PM12/10/14
to RHadoop Google Group
If you look at the hadoop job counters there is a number of reduce calls. It's exactly one per key, but you can make it fewer with the vectorized.reduce option (for when you have very small groups, not your case). I don't understand your other point about files being created or not. You can't "think" there's more reduce calls, you have to know it. If you know that, then it's a bug and we need to fix it. It's happened at times that subtle things like the order of attributes of the keys resulted in more distinct keys than expected. 

So my idea of your program is now

from.dfs(
  mapreduce(
    to.dfs(mtcars), 
    map = function(k,v) keyval(v$cyl, v), 
    reduce = function(k,v) keyval(k, median(v$mpg/mean(v$mpg)))))

or some such




--

Joaquin Torre

unread,
Dec 11, 2014, 3:02:11 PM12/11/14
to rha...@googlegroups.com, ant...@piccolboni.info
Hi Antonio

Again, thank you very much for your reply.

To test things, I only allowed the key "18" to come out of the map function. I've logged the output of the map function (using write.table before returning, in append mode of course). Also, I logged the values the reduce function is receiving.

This is the result of the algorithm when maps = 20:
test2
  key    val.1     val.2    val.3      val.4 
1 18 0.522006 0.2881658 0.118481 0.07134726

This is the result of the algorithm when maps = 30: 
test1
  key     val.1     val.2     val.3      val.4
1  18 0.4969016 0.3016166 0.1319453 0.06953642

That is my problem, I can't have that variation. 

The output of the map function is a file (I concatenated them on a single file):
worker1 : "age" "quantity" "var1" "var2" "var3" "var4"
worker1 : "1747" 18 0 0 0 0 0
worker1 : "1844" 18 0 0 0 0 0
(...)
worker1 : "2523" 18 0 0 0 0 0
worker1 : "2620" 18 0 0 0 0 0
worker1 : "2717" 18 1 0.00148370210305262 0.000608698298688255 0.000243479319475302 9.13047448032383e-05
worker1 : "2814" 18 2 0.00298049399474725 0.00113612889898781 0.000575441909876944 0.00016230412842683
worker1 : "2911" 18 8 0.010596732180914 0.00586076249112006 0.00201278711816244 0.000947193937958797
worker1 : "3008" 18 26 0.0347183285625756 0.0170714433263493 0.00786437276831824 0.00345265145926166
worker1 : "3105" 18 53 0.0729443510332484 0.0309025329500461 0.0183259207029343 0.00646797201280035
worker1 : "3202" 18 40 0.0576033750709901 0.023798577494118 0.0100062200827541 0.00567920599291451
(...)
worker1 : "age" "quantity" "var1" "var2" "var3" "var4"
worker1 : "98" 18 61 0.106296886998641 0.030177506054699 0.0117106739913757 0.00585533699568787
worker1 : "195" 18 67 0.110259902394734 0.0389711723981387 0.0104556803995006 0.00950516399954602
worker1 : "292" 18 65 0.107460886224146 0.0378188526625749 0.0133749600879838 0.00507326072302835
worker1 : "389" 18 0 0 0 0 0
(...)

The fact that the header is shown multiple times just means that it is the output of another call of the map function.

The reduce should only be called once, right? There is only one key. These are the relevant output information of MapReduce (for maps = 30):
	Job Counters 
		Launched map tasks=30
		Launched reduce tasks=1
	Map-Reduce Framework
		Map input records=2618
		Reduce input groups=1
		Reduce shuffle bytes=12750
		Reduce input records=90
		Reduce output records=3
		Shuffled Maps =30
	rmr
		reduce calls=31
 
My first question is: why are there 31 reduce calls? On the other hand, I see "Reduce input groups = 1", and that is correct, as there is only one key.

This is the logging of the values (not keys, which I included above) the reduce received:
(...)
worker1 : "X1" "X2" "X3" "X4" "X5"
worker1 : "1" 0 0 0 0 0
worker1 : "2" 0 0 0 0 0
(...)
worker1 : "28" 1 0.00148370210305262 0.000608698298688255 0.000243479319475302 9.13047448032383e-05
worker1 : "29" 2 0.00298049399474725 0.00113612889898781 0.000575441909876944 0.00016230412842683
worker1 : "30" 8 0.010596732180914 0.00586076249112006 0.00201278711816244 0.000947193937958797
worker1 : "31" 26 0.0347183285625756 0.0170714433263493 0.00786437276831824 0.00345265145926166
worker1 : "32" 53 0.0729443510332484 0.0309025329500461 0.0183259207029343 0.00646797201280035
worker1 : "33" 40 0.0576033750709901 0.023798577494118 0.0100062200827541 0.00567920599291451
(...)
worker1 : "X1" "X2" "X3" "X4" "X5"
worker1 : "1" 0 0 0 0 0
worker1 : "2" 0 0 0 0 0
worker1 : "3" 0 0 0 0 0
(...)

The values match, that's okay. HOWEVER, why does it show the header multiple times? That can only mean that the reduce is being called multiple times for the same key. That is my problem, and I think that's why my results fluctuate. Even more, when I count the amount of times the header repeats it's 31, the same amount of reduce calls. I think it should only be one, as only one reduce should get called with a single key.

Thank you very much.


Antonio Piccolboni

unread,
Dec 11, 2014, 3:10:06 PM12/11/14
to RHadoop Google Group
What is the value of the option combine? Is the number of reduce calls always equal to the number of map tasks + 1? Just guessing, with no test code or data it's tough and I may eventually throw up my hands. But I think the experiments you are doing are helpful and we may be narrowing it down.


Joaquin Torre

unread,
Dec 11, 2014, 3:15:48 PM12/11/14
to rha...@googlegroups.com, ant...@piccolboni.info
You mean this combine?

Combine input records=90
Combine output records=90


Regarding the latter part of your question, yes it is. I tried 20 maps and the reduce calls are 21. However, this only happens when I have only one key. When I have more than one, this equation does not work.

Thank you

Antonio Piccolboni

unread,
Dec 11, 2014, 3:16:58 PM12/11/14
to RHadoop Google Group
combine option to mapreduce

Joaquin Torre

unread,
Dec 11, 2014, 3:38:53 PM12/11/14
to rha...@googlegroups.com, ant...@piccolboni.info

That's it. It was set to true instead of null, I had copied the mapreduce call from somewhere else. It works now.

Antonio Piccolboni, I will be forever grateful for your help. You are amazing. I will put up a painting of you in my house and make my future children respect it.

Joaquin

Antonio Piccolboni

unread,
Dec 11, 2014, 3:49:31 PM12/11/14
to RHadoop Google Group
On Thu, Dec 11, 2014 at 12:38 PM, Joaquin Torre <joaquintor...@gmail.com> wrote:

That's it. It was set to true instead of null, I had copied the mapreduce call from somewhere else. It works now.

Antonio Piccolboni, I will be forever grateful for your help. You are amazing. I will put up a painting of you in my house and make my future children respect it.

There is no need, I get paid for doing this but glad to be helpful. 

A
Reply all
Reply to author
Forward
0 new messages