Total Order sorting

98 views
Skip to first unread message

Saptarshi Guha

unread,
Jul 24, 2013, 3:08:07 PM7/24/13
to rh...@googlegroups.com
Hello,

I read in passing an issue with "total order sorting" in RHIPE.
I.e how do i sort a data set by a key into N files, so that all keys in Ni are less than all the keys
in Nj (for j>i) and all keys inside Ni are sorted.

Note, this works for integer and string keys before version 0.72.1-2.
0.72-1.2 fixes the bug for numeric vectors: hence the code below will only work for
0.72-1.2 ( http://ml.stat.purdue.edu/rhipebin/Rhipe_0.73.1-2.tar.gz )



The following code creates a dataset where the keys are between 0 and 1


map <- function(a,b)
  for(i in 1:1000)
   rhcollect(runif(1),1)
m <- rhwatch(map=map, reduce=0, input=c(1000,3),read=FALSE)


The following code will sort it into 10 files (10 reducers) the ranges in each
file defined by 'intervals'. Note how the map emits a 'reduceBucket' which sends the keys to the
correct partitioner. The reduce code then drops this 'reduceBucket' key.


intervals <- seq(0,1,length=10)

mapsort <- expression({
  whichReducer <- findInterval( unlist(map.keys), intervals)
  mapply(function(reduceBucket, key,value){
    rhcollect(c(reduceBucket,key), value)
  }, whichReducer, map.keys,map.values,SIMPLIFY=FALSE)
})

reducer <- expression(
    reduce = {
      lapply(reduce.values, function(v) rhcollect(reduce.key[2], v))
    })
          
sorter <- rhwatch(map=mapsort, reduce=reducer, input=m       
                  ,orderby='numeric'
                  ,partitioner=list(type='numeric',lims=1)
                  ,mapred=list(mapred.reduce.tasks = length(intervals))
                  ,read=FALSE)



          Cheers
          Saptarshi
         

Fishtank

unread,
Jul 24, 2013, 5:06:16 PM7/24/13
to rh...@googlegroups.com, saptars...@gmail.com
The hashcode for numerics is not the best for dividing say numbers 0-1 (as in the below example).
I find that approximate sorting (i.e till the 6'th decimal place) is works faster

## generate data

map <- function(a,b)
  for(i in 1:1000)
   rhcollect(runif(1),1)
m <- rhwatch(map=map, reduce=0, input=c(1000,3),read=FALSE)


NUMPLACES <- 7
mapsort <- expression({
  whichReducer <- findInterval( unlist(map.keys), intervals)
  mapply(function(reduceBucket, key,value){
    rhcollect(c(reduceBucket,as.integer(key*(10^NUMPLACES))), list(key,value))
  }, whichReducer, map.keys,map.values,SIMPLIFY=FALSE)
})

intervals <- seq(0,1,length=50)

reducer <- expression(
    reduce = {
## For demonstration, keep the bucket (so that we can confirm sorting took place
## and drop that 'magnified discretized key'

      lapply(reduce.values, function(v) rhcollect(c(reduce.key[1],v[[1]]), v[[2]]))
    })

         
sorter <- rhwatch(map=mapsort, reduce=reducer, input=m      
                  ,orderby='integer'
                  ,partitioner=list(type='integer',lims=1)
                  ,mapred=list(mapred.reduce.tasks = length(intervals)-2) ## the first and last will never have values in them
                  ,read=FALSE)



## Confirm      
       mu <- lapply(rhls(sorter)$file[grepl("part",rhls(sorter)$file)],function(i){
         a <- rhread(i)
         j <- unlist(lapply(a,function(r) r[[1]][[2]]))
         list( unlist(lapply(a,function(r) r[[1]][[1]]))[[1]], range(j))
       })


You should see 48 buckets, with non-overlapping and sorted buckets.
Increasing the intervals increases parallel sorting - it is kinda slow. RHIPE has to deserialize each key to compare ...

Regards
Saptarshi

xcub...@omitsis.com

unread,
Jul 26, 2013, 4:51:13 AM7/26/13
to rh...@googlegroups.com, saptars...@gmail.com
I'm trying this feature. I set an output folder and save it as text to see results from Hadoop web interface and its not working for me.

If I run the first code as is, I get odd part files empty and the other ones are not sorted at all:

(I've collected reduceBucket as in your second example)
# Content of  "/tmp/sort_test/part-r-00001"
4.0 0.33340575088476676 1.0 
# Content of  "/tmp/sort_test/part-r-00003"
3.0 0.2227320245765758 1.0
# Content of  "/tmp/sort_test/part-r-00009"
1.0 2.7771179046576206E-4 1.0

My relevant sessionInfo() for version assert:

other attached packages:
[1] codetools_0.2-8 rJava_0.9-4     Rhipe_0.73.1-2

And Hadoop version:

# hadoop version
Hadoop 2.0.0-cdh4.3.0


Running your second example works better but, if I set 2 reducers less as you said:

#  /tmp/sort_test/part-r-00000 is empty
# Content of  "/tmp/sort_test/part-r-00001"
1.0 2.7771179046576206E-4 1.0
# Content of  "/tmp/sort_test/part-r-00047"
47.0 0.9392276984544847 1.0

If I run with same reducers as intervals (it is 50 reducers):

# Content of  "/tmp/sort_test/part-r-00000"
48.0 0.9591911839115821 1.0
# Content of  "/tmp/sort_test/part-r-00001"
1.0 2.7771179046576206E-4 1.0
# Content of  "/tmp/sort_test/part-r-00049"
49.0 0.9797495281761285 1.0


So it seems that numeric is not working for me maybe a version issue. 

For integer version it seems that it should work just subtracting 1 to "whichReducer" but results on an IndexOutOfBoundException.

I paste you the error trace if it helps to you:

2013-07-26 10:47:09,949 INFO org.godhuli.rhipe.RHMRHelper: Mapper:Started Output Thread 2013-07-26 10:47:10,121 WARN org.godhuli.rhipe.RHMRHelper: Mapper:java.lang.IndexOutOfBoundsException: Index: 0 at java.util.Collections$EmptyList.get(Collections.java:3164) at org.godhuli.rhipe.REXPProtos$REXP.getIntValue(REXPProtos.java:306) at org.godhuli.rhipe.RHInteger.readFields(RHInteger.java:54) at org.godhuli.rhipe.RHMRHelper$MROutputThread.readRecord(RHMRHelper.java:314) at org.godhuli.rhipe.RHMRHelper$MROutputThread.run(RHMRHelper.java:337) 2013-07-26 10:47:20,124 INFO org.godhuli.rhipe.RHMRHelper: Mapper:MRErrorThread done 2013-07-26 10:47:20,126 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 2013-07-26 10:47:20,129 ERROR org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:hdfs (auth:SIMPLE) cause:java.io.IOException: MROutput/MRErrThread failed:java.lang.IndexOutOfBoundsException: Index: 0 at java.util.Collections$EmptyList.get(Collections.java:3164) at org.godhuli.rhipe.REXPProtos$REXP.getIntValue(REXPProtos.java:306) at org.godhuli.rhipe.RHInteger.readFields(RHInteger.java:54) at org.godhuli.rhipe.RHMRHelper$MROutputThread.readRecord(RHMRHelper.java:314) at org.godhuli.rhipe.RHMRHelper$MROutputThread.run(RHMRHelper.java:337) 2013-07-26 10:47:20,130 WARN org.apache.hadoop.mapred.Child: Error running child java.io.IOException: MROutput/MRErrThread failed:java.lang.IndexOutOfBoundsException: Index: 0 at java.util.Collections$EmptyList.get(Collections.java:3164) at org.godhuli.rhipe.REXPProtos$REXP.getIntValue(REXPProtos.java:306) at org.godhuli.rhipe.RHInteger.readFields(RHInteger.java:54) at org.godhuli.rhipe.RHMRHelper$MROutputThread.readRecord(RHMRHelper.java:314) at org.godhuli.rhipe.RHMRHelper$MROutputThread.run(RHMRHelper.java:337) at org.godhuli.rhipe.RHMRHelper.checkOuterrThreadsThrowable(RHMRHelper.java:244) at org.godhuli.rhipe.RHMRMapper.run(RHMRMapper.java:68) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) at org.apache.hadoop.mapred.Child$4.run(Child.java:268) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) at org.apache.hadoop.mapred.Child.main(Child.java:262)

Saptarshi Guha

unread,
Jul 26, 2013, 12:56:31 PM7/26/13
to xcub...@omitsis.com, rh...@googlegroups.com
It appears integer partitioning is also broken.
Btw, 0.73.1-2 has correct numeric partitioning. Did you try with this version?

I'll test it a bit more thouroughly over the next few days

Saptarshi Guha

unread,
Jul 26, 2013, 1:44:41 PM7/26/13
to xcub...@omitsis.com, rh...@googlegroups.com
Alrighty, everything works on 0.73.1-2
( http://ml.stat.purdue.edu/rhipebin/Rhipe_0.73.1-2.tar.gz )


The following should work swimmingly,

## Create Data
map <- function(a,b)
  for(i in 1:10)
    rhcollect(runif(1),1)
m <- rhwatch(map=map, reduce=0, input=c(1000,3),read=FALSE,output=path.to.output)

## Sort it by INTEGER key
intervals <- seq(0,1,length=50)
PRECISION <- 1e7

mapsort <- expression({
  whichReducer <- findInterval( unlist(map.keys), intervals)
  mapply(function(reduceBucket, key,value){
    rhcollect(c(reduceBucket,as.integer(key*PRECISION)), list(key,value))
  }, whichReducer, map.keys,map.values,SIMPLIFY=FALSE)
})

reducer <- expression(
    reduce = {
      lapply(reduce.values, function(v) rhcollect(v[[1]], v[[2]]))

    })
         
sorter <- rhwatch(map=mapsort, reduce=reducer, input=m      
                  ,orderby='integer'
                  ,partitioner=list(type='integer',lims=1)
                  ,mapred=list(mapred.reduce.tasks = length(intervals))
                  ,read=FALSE)

## Read Files
part.files <- sort(rhls(sorter)$file[ grepl("part",rhls(sorter)$file) ])

## Read in Each Part File and the get the range of keys Some buckets
## can be empty because findInterval never assigned the key to the
## bucket

keyranges <- lapply(part.files,function(p){
  y <- rhread(p)
  actual.key <- unlist(lapply(y, function(r) r[[1]]))
  list(actual.key,p)
})

## Confirm that we sorted upto precision given by PRECISION
## i.e. the largest key for bucket J is <= smallest key for bucket K, K>J
## and within a bucket all sorted upto precision PRECISION

all(sapply(1:(length(keyranges)-1),function(J){
  if(is.null(keyranges[[J]][[1]])) return(TRUE)
  rmax <- max(   signif(keyranges[[J]][[1]], log(PRECISION,10)))
  nextmin <- min( signif(keyranges[[J+1]][[1]], log(PRECISION,10)))
  rmax <= nextmin  & all(diff(signif(keyranges[[J]][[1]], log(PRECISION,10)))>=0)
}))

## However, it not guaranteed  that the actual keys are  sorted within a BUCKET
## and this /can/ be false (i.e. it can be out of sort order) for a LARGE number of keys
## the higher the PRECISION less likely this will happen

all(sapply(1:(length(keyranges)-1),function(J){
  if(is.null(keyranges[[J]][[2]])) return(TRUE)
  all(diff(keyranges[[J]][[1]] )>=0)
}))



Saptarshi Guha

unread,
Jul 29, 2013, 1:09:03 PM7/29/13
to xcub...@omitsis.com, rh...@googlegroups.com



On Mon, Jul 29, 2013 at 1:34 AM, <xcub...@omitsis.com> wrote:
I can't see my original message and I'm sorry if I'm messing up, but what I pointed out was:

1. Your code is working for integer sorting but with empty reducers

As you said there is one empty reducer because findInterval will never return 0. In other words, reducer "0" (the one who creates part-r-00000) always will be empty because reduceBucket never will be 0. It can be solved just subtracting 1 on whichReducer function but it will raise an IndexOutOfBoundsException (I think I've already pasted error in my last post) and if we want to "force" non-empty reducers with length(intervals) - 1 reducers we will see how reducer "0" will have our last reduceBucket (49 in your case)


You need to do

 whichReducer <- findInterval( unlist(map.keys), intervals)-1L

Notice the -1L, without it, R converts the output of findInterval to numeric. Yet, RHIPE expects to see an integer key...


 

2. Numeric is not working for me. Using your first post code for numeric pure numeric sorting and your keyRanges function I get:

> part.files <- sort(rhls(sorter)$file[ grepl("part",rhls(sorter)$file) ])
> part.files
 [1] "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00000" "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00001"
 [3] "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00002" "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00003"
 [5] "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00004" "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00005"
 [7] "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00006" "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00007"
 [9] "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00008" "/tmp/rhipe-temp-afca74b7ad225cdd53bb66263088b27b/part-r-00009"
> keyranges <- lapply(part.files,function(p){
+   y <- rhread(p)
+   actual.key <- unlist(lapply(y, function(r) r[[1]]))
+   list(actual.key,p)
+ })
Read 0 objects(0 KB) in 0.06 seconds
Read 1137 objects(44.42 KB) in 0.13 seconds
Read 0 objects(0 KB) in 0.02 seconds
Read 2239 objects(87.46 KB) in 0.09 seconds
Read 0 objects(0 KB) in 0.02 seconds
Read 2214 objects(86.49 KB) in 0.06 seconds
Read 0 objects(0 KB) in 0.02 seconds
Read 1099 objects(42.93 KB) in 0.04 seconds
Read 0 objects(0 KB) in 0.02 seconds
Read 3311 objects(129.34 KB) in 0.06 seconds
>

If you check the keys in the non empty files, they are sorted. Empty part files are produced since 50 reducers were requested but
the hashing function only mapped to 25 different integers. This is a problem with numeric sorting and the reducers will fill up
when you have a wider range of the input key.

Why is having empty part files a bug?

Regards
Saptarshi


xcub...@omitsis.com

unread,
Jul 31, 2013, 3:40:47 AM7/31/13
to rh...@googlegroups.com, xcub...@omitsis.com, saptars...@gmail.com


El lunes, 29 de julio de 2013 19:09:03 UTC+2, Fishtank escribió:



On Mon, Jul 29, 2013 at 1:34 AM, <xcub...@omitsis.com> wrote:
I can't see my original message and I'm sorry if I'm messing up, but what I pointed out was:

1. Your code is working for integer sorting but with empty reducers

As you said there is one empty reducer because findInterval will never return 0. In other words, reducer "0" (the one who creates part-r-00000) always will be empty because reduceBucket never will be 0. It can be solved just subtracting 1 on whichReducer function but it will raise an IndexOutOfBoundsException (I think I've already pasted error in my last post) and if we want to "force" non-empty reducers with length(intervals) - 1 reducers we will see how reducer "0" will have our last reduceBucket (49 in your case)


You need to do

 whichReducer <- findInterval( unlist(map.keys), intervals)-1L

Notice the -1L, without it, R converts the output of findInterval to numeric. Yet, RHIPE expects to see an integer key...


Thanks, it worked like a charm! I'm not an R programmer and I couldn't even imagine that an IndexOutOfBoundsException was about a type issue. 
I get the point. I didn't mean it was a bug, just an unexpected behavior. I'm sorry if I wasted your time.

Regards
Saptarshi


Thanks a lot for your answers,
Xavi 

Saptarshi Guha

unread,
Jul 31, 2013, 4:47:08 PM7/31/13
to xcub...@omitsis.com, rh...@googlegroups.com
No time wasted. thanks for your questions, because of which a bug in numeric sorting was fixed.
Well the -1L thing is because we indicated to RHIPE that the key is of integer type.
Hence in java it tries to deparse the object as integer type (but since -1L was not used the type is actually numeric)
and all those errors ensue.
One could have type checked each key, but that would be slow (i think ...)
Reply all
Reply to author
Forward
0 new messages