R script with RHadoop(rmr2) hadoop streaming(MR) job fails to read/write from/to a built in format

319 views
Skip to first unread message

VT

unread,
Feb 24, 2015, 4:59:49 PM2/24/15
to rha...@googlegroups.com
#! /usr/bin/env Rscript

....................
....................

library(rmr2)
library(rhdfs)
library(rJava)

....................
..................

#Read files from hdfs data directory
hdfs.root <- '/user/abc/R/stocks'
hdfs.data <- file.path(hdfs.root, 'data')

#and save the output to hdfs out directory
hdfs.out <- file.path(hdfs.root, 'out')


#This call works and runs a Hadoop streaming job on the Hadoop cluster, reads the HDFS input directory and writes the output to the HDFS directory. I can read the data back into an R data frame.

 mapreduce(input = input,
           output = output,
           input.format = "text",
           output.format = "native",
           map = s.map,
           reduce = s.reduce,
           combine = NULL)


#The following call fails with a message: Error: !is.null(template) is not TRUE, Execution halted (The mappers and reducers are scheduled on the cluster and nothing gets written to the output part files)

 mapreduce(input = input,
           output = output,
           input.format = "text",
           output.format = "csv",
           map = s.map,
           reduce = s.reduce,
           combine = NULL)


According to the link https://github.com/RevolutionAnalytics/rmr2/blob/master/docs/getting-data-in-and-out.md the above code with CSV output format should have worked. We downloaded the RHadoop libraries from GitHub. I am not sure if the template it was complaining about should have been part of rmr2 library or some other configuration/code.

Any help is greatly appreciated.

Thanks

Antonio Piccolboni

unread,
Feb 24, 2015, 8:37:24 PM2/24/15
to RHadoop Google Group
That error comes from the parser for the native format. The native format is used for the shuffle phase even if input and output format are set to other formats. Since you don't provide data or code, I can'r reproduce your problem, which means you are going to have to troubleshoot it yourself. I can only give a few suggestions. First, I would simplify your environment by not loading rhdfs and rjava. Ever heard about minimal reproducible test case? Yours is neither. Second I would try to reproduce the error on the local backend. If it happens there I would debug it like any R program. If it doesn't, I would remove the reducer and see what happens. If it goes through, then I would want to figure out how many reduce tasks are scheduled and how many distinct keys and how many reduce input records. And finally, you need to know exactly which version you are running. From github, you can download anything all the way back to 2.0, I believe. So a packageDescription("rmr2") will give us the information we need.

--
post: rha...@googlegroups.com ||
unsubscribe: rhadoop+u...@googlegroups.com ||
web: https://groups.google.com/d/forum/rhadoop?hl=en-US
---
You received this message because you are subscribed to the Google Groups "RHadoop" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhadoop+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

VT

unread,
Feb 25, 2015, 11:28:42 AM2/25/15
to rha...@googlegroups.com, ant...@piccolboni.info
Antonio

Thank you very much for the quick response.Sorry, I did not provide enough details earlier. Attached are the R script and the input file.

This is what we installed.

-bash-4.1$ R --version

Oracle Distribution of R version 3.1.1  (--) -- "Sock it to Me"
Copyright (C)  The R Foundation for Statistical Computing
Platform: x86_64-unknown-linux-gnu (64-bit)

-bash-4.1$ rpm -qa | sort | grep 'R'

libRmath-3.1.1-2.el6.x86_64
libRmath-devel-3.1.1-2.el6.x86_64
perl-URI-1.40-2.el6.noarch
R-3.1.1-2.el6.x86_64
R-core-3.1.1-2.el6.x86_64
R-devel-3.1.1-2.el6.x86_64


git clone git://github.com/RevolutionAnalytics/rmr2.git

Package: rmr2
Type: Package
Title: R and Hadoop Streaming Connector
Version: 3.3.1
Date: 2014-12-2
Author: Revolution Analytics
Depends: R (>= 2.6.0), methods
Imports: Rcpp, RJSONIO (>= 0.8-2), digest, functional, reshape2, stringr, plyr, caTools (>= 1.16)
Suggests: quickcheck (>= 3.0.0), ravro, rhdfs, testthat
Collate: basic.R extras.R hdfs.R keyval.R IO.R local.R mapreduce.R parse-url.R quickcheck-rmr.R streaming.R
Maintainer: Revolution Analytics <rha...@revolutionanalytics.com>
Description: Supports the map reduce programming model on top of hadoop streaming
License: Apache License (== 2.0)
ByteCompile: TRUE
BuildVignettes: FALSE

-bash-4.1$ hadoop version
Hadoop 2.5.0-cdh5.3.0





I tried without reducer(s).

with stocks.txt (fields separated by comma) as input
and
mapreduce(input = input,                
          input.format = "text",                 
          map = s.map,           
          combine = NULL)

it works (invokes 2 mappers)

with stocks.csv (fields separated by comma) as input...
and
mapreduce(input = input,                
          input.format = "csv",                 
          map = s.map,           
          combine = NULL)

I get this error

15/02/25 11:03:49 INFO mapreduce.Job: Task Id : attempt_1424375261738_0109_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

---
with stocks.txt (fields separated by comma) as input and

mapreduce(input = input,
           output = output,
           input.format = "text",
           output.format = "native",
           map = s.map,
           reduce = s.reduce,
           combine = NULL)


it works (invokes 2 mappers and 36 reducers).

Thanks
stocks.txt
test_rmr.R

Antonio Piccolboni

unread,
Feb 25, 2015, 12:03:41 PM2/25/15
to RHadoop Google Group
Did you get that template error when you invoked mapreduce or when you called from.dfs? Everything works if you just change the from.dfs call to 

results <- from.dfs(joboutput, format = "csv")

If you read one format with the wrong parser, results are unpredictable.


Antonio

Saar Golde

unread,
Feb 26, 2015, 9:29:10 AM2/26/15
to rha...@googlegroups.com
A couple of quick comments that may shed more light on this:
1. For some reason, the default separator for the input.format="csv" case is actually a tab ("\t"), and not a comma. (Antonio: I'm not sure whether this is a big or a feature). If you want your code to leverage the input format parameter, you'd need something like: input.format = make.input.format(format = "csv", sep=",")
2. When you change your input format, you need to change the mapper accordingly to not try to split the inputs - they are no longer strings. 
3. I'm not sure you took the vectorized nature of the mappers and reducers into account when you built the splitting by commas in your mapper: rmr mappers work on a set of rows at a time, not necessarily a single row, so k and v are vectors or lists of elements or dataframes, not necessarily single row elements of inputs. Which is why the output from your specific code will be different for different numbers of mappers. 



Antonio Piccolboni

unread,
Feb 26, 2015, 12:10:00 PM2/26/15
to RHadoop Google Group
I second all of Saar comments, but I was focusing on the immediate cause of the error, which I still think is not specifying the correct format in a from.dfs call. As far as the details of the csv format, the defaults are the same as read.table and write.table in R. I didn't use read.csv and write.csv because they are less flexible. For some, CSV stands for Character Separated Values, where the separator can vary, and by many accounts, including wikipedia's, it's not a format but a family of formats defined by separator, quoting conventions and more . So at this point I'd rather stay with the more general definition than make a backward incompatible change to comply with the more specific definition. I think this point has been raised before and I may be on the wrong side of history, but that's my reasoning. Finally, when people swap native for csv as in this case, major errors can occur including interpreter crashes, but missing the correct separator when using csv usually results in the data all in one column as strings or an error complaining about the wrong number of columns in some row, and those point to the solution.


Antonio


PS: yes it'd be nice to improve the failure modes of the native parser in case of wrong format selection, pull requests welcome.

VT

unread,
Feb 26, 2015, 10:23:34 PM2/26/15
to rha...@googlegroups.com, ant...@piccolboni.info
I need more help on this one. As suggested added the format that I was missing in from.dfs call .. from.dfs(joboutput, format = "csv").

Now if I run the attached R script using the stocks.txt file (that is already attached earlier) I only see two rows written to the output. There are have three 3 more stocks on the input.
These values are coming from single lines. The mappers are not reading all the lines. Only 2 mappers run for the job. If I don't specify the reducers 36 reducers run. The output is same all the time.
Attached the code that I ran.
"AAPL" "88.315"
"GOOG" "428.875"
----------------------

Counters from the run....

File System Counters
FILE: Number of bytes read=387
FILE: Number of bytes written=832589
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=4019
HDFS: Number of bytes written=35
HDFS: Number of read operations=21
HDFS: Number of large read operations=0
HDFS: Number of write operations=10
Job Counters
Launched map tasks=2
Launched reduce tasks=5
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=35612
Total time spent by all reduces in occupied slots (ms)=70810
Total time spent by all map tasks (ms)=17806
Total time spent by all reduce tasks (ms)=35405
Total vcore-seconds taken by all map tasks=17806
Total vcore-seconds taken by all reduce tasks=35405
Total megabyte-seconds taken by all map tasks=36466688
Total megabyte-seconds taken by all reduce tasks=72509440
Map-Reduce Framework
Map input records=45
Map output records=6
Map output bytes=588
Map output materialized bytes=447
Input split bytes=252
Combine input records=0
Combine output records=0
Reduce input groups=2
Reduce shuffle bytes=447
Reduce input records=6
Reduce output records=2
Spilled Records=12
Shuffled Maps =10
Failed Shuffles=0
Merged Map outputs=10
GC time elapsed (ms)=196
CPU time spent (ms)=10390
Physical memory (bytes) snapshot=3010527232
Virtual memory (bytes) snapshot=16141971456
Total committed heap usage (bytes)=8559525888
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=3767
File Output Format Counters
Bytes Written=35
rmr
reduce calls=2
15/02/26 22:00:20 INFO streaming.StreamJob: Output directory: /user/tummv00/ext_tab_dir/R/stocks/out_csv
[1] "Displaying few rows of results"
[[1]]
key val
1 NULL 1.000, 2.000, 88.315, 428.875

Thanks

stock_cma_csv_rmr.R

Antonio Piccolboni

unread,
Feb 26, 2015, 10:31:38 PM2/26/15
to RHadoop Google Group
Which one of Saar's suggestion is not clear? Apply each of them and your program will work. 

VT

unread,
Feb 26, 2015, 10:39:21 PM2/26/15
to rha...@googlegroups.com, ant...@piccolboni.info

If I run the attached code(stock_cma_csv_rmr.R) using the stocks.txt(already attached here) as input.. only 2 rows (stocks) get written to the output (2 mappers are invoked). Thre are 3 more stocks on the input., I must be doing something wrong.

Thanks
stock_cma_csv_rmr.R

Saar Golde

unread,
Feb 27, 2015, 10:02:36 AM2/27/15
to rha...@googlegroups.com
I can elaborate more:

 I'm not sure you took the vectorized nature of the mappers and reducers into account when you built the splitting by commas in your mapper: rmr mappers work on a set of rows at a time, not necessarily a single row, so k and v are vectors or lists of elements or dataframes, not necessarily single row elements of inputs. Which is why the output from your specific code will be different for different numbers of mappers. 

You wrote your mapper as if it reads a single row, splits a single row, to which you apply a simple calculation, and return a key value pair. 
What your code actually does is read in several lines, so v is a character vector, each contains the content of a row of data. 
Then you split and unlist it and use two elements, basically throwing away anything that's not in the first line of data the mapper read. 
You had two mappers, each used only the first row of data, so you have two lines of output. 


Reply all
Reply to author
Forward
0 new messages