Here is a small tutorial on manipulating csv file.
Problem: Filter out year-wise data for stock-symbol, KOOL, from file 'stock.csv' and for this stock symbol,
calculate per-day gain/variation in stock prices. A sample of data (having column headings) is as below:
exchange,stock_symbol,date,stock_price_open,stock_price_high,stock_price_low,stock_price_close,stock_volume,stock_price_adj_close
NASDAQ,KINS,2010-02-08,2.90,2.90,2.90,2.90,000,2.90
NASDAQ,KINS,2010-02-05,2.90,2.90,2.90,2.90,700,2.90
NASDAQ,KINS,2010-02-04,2.96,2.96,2.96,2.96,000,2.96
:
:
NASDAQ,KINS,2009-10-27,2.25,2.25,2.25,2.25,100,2.25
NASDAQ,KINS,2009-10-26,1.84,2.20,1.84,2.20,500,2.20
NASDAQ,KINS,2009-10-23,2.16,2.17,2.16,2.17,900,2.17
:
:
NASDAQ,KOOL,1997-05-07,3.91,3.91,3.56,3.56,37300,3.56
NASDAQ,KOOL,1997-05-06,3.91,3.94,3.91,3.94,8400,3.94
NASDAQ,KOOL,1997-05-05,3.81,4.12,3.81,3.88,188100,3.88
NASDAQ,KOOL,1997-05-02,3.88,4.00,3.69,3.81,201000,3.81
NASDAQ,KOOL,1997-05-01,3.47,3.88,3.47,3.88,168000,3.88
:
:
This file is stored in hadoop file system at the location /user/test/stock.csv.
Solution: This problem can be solved using hive over hadoop. But it has been done here
using RHadoop. Our 'map' function should emit year (say, 2004) as key and corresponding
to this 'year' key there will be a maximum of 365 rows of KOOL data (assuming stock market is open
on all days of the year). That is a sample of (key, value) data emitted by map function should be as:
key value
1997 NASDAQ,KOOL,1997-05-07,3.91,3.91,3.56,3.56,37300,3.56
1997 NASDAQ,KOOL,1997-05-06,3.91,3.94,3.91,3.94,8400,3.94
1997 NASDAQ,KOOL,1997-05-05,3.81,4.12,3.81,3.88,188100,3.88
1997 NASDAQ,KOOL,1997-05-02,3.88,4.00,3.69,3.81,201000,3.81
1997 NASDAQ,KOOL,1997-05-01,3.47,3.88,3.47,3.88,168000,3.88
At the reduce stage, per reducer, we sum up 9th and 4th column from filtered data and also
find difference between them.
The map function would be, as below:
# Specify the format of input file, for use in map function
myformat<-make.input.format("csv",sep=",")
searchsymbol<-"KOOL"
map = function(k,stockdata) {
# convert, second column of stock-symbols to vector format. Default is data.frame.
s_symbols<-as.vector(as.matrix(stockdata[ , 2] ))
# Create an empty vector to contain our filtered rows
filtered_data<-c()
# Loop over all rows in the stock.csv file
for(i in 1:length(stockdata[ ,2])) {
# For i-th row-number, compare value in vector 's_symbols' to symbol, KOOL
# If comparison is TRUE, then this row-number is of our interest. Bind it, row-wise along with earlier found rows.
if(s_symbols[i]==searchsymbol) filtered_data<-rbind(filtered_data,stockdata[i,])
}
# After all rows have been gone through, emit 'filtered_data' as value and the first
# four letters (say, 1997) of the string (say, 1997-05-07) in the third column of stock-data as key
keyval(as.numeric(substr(filtered_data[ ,3], 1, 4)),filtered_data)
}
All rows, having the same key (say, 1997) will be fed to one reducer. (This is how reducers operate).
That is different reducers have rows with differing keys. Reducer function will be as:
reduce = function(year,symbol_data) {
# How many rows for every year
noOfRowsperyear<-length(symbol_data[,9])
# Sum all data in 9th column after its conversion to vector, and then numeric
stock_price_adj_close<-as.vector(as.matrix(symbol_data[,9])) ;
# Sum all data in 4th column after its conversion to vector, and then numeric
stock_price_open<-as.vector(as.matrix(symbol_data[,4])) ;
yearwiseSum_stockPriceAdjClose<-sum(as.numeric(stock_price_adj_close))
yearwiseSum_stockPriceOpen<-sum( as.numeric(stock_price_open))
# Prepare the 'value' to be emitted after combining them in a vector form
output<-cbind(noOfRowsperyear,yearwiseSum_stockPriceOpen,yearwiseSum_stockPriceAdjClose)
# Year wise results
keyval(year,output)
}
#Run the mapreduce now:
year_wisedata<-mapreduce(input= '/user/test/stock.csv', map = map, reduce = reduce, input.format=myformat)
# Get the result from hdfs
result<-from.dfs(year_wisedata)
# Display result
result$val
result$key
# Combine result to plot
xy<-cbind(result$key,result$val)
# Plot it now
plot(xy[ ,1] , xy[ ,4]- xy[ ,3] )
A printout of result, xy, is as:
noOfRowsperyear yearwiseSum_stockPriceOpen
[1,] 1995 92 89.14
[2,] 1996 254 723.25
[3,] 1997 252 844.34
[4,] 1998 250 540.37
yearwiseSum_stockPriceAdjClose
[1,] 176.40
[2,] 889.54
[3,] 843.54
[4,] 537.19