Hi Chris,
Below is my code and during execution I am running below commands from
command prompt. Eventually JOB is failing with this exception:
java.io.IOException: Split class cascading.tap.hadoop.MultiInputSplit
not found
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:326)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.lang.ClassNotFoundException:
cascading.tap.hadoop.MultiInputSplit
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
Steps Executed to run the program::
###############################
===========================================================
Step 1: Copy the log files from local file system to Hadoop file
system
===========================================================
./hadoop dfs -copyFromLocal /home/hadoopcluster/
sample_programs_and_data/sample_logs_data
(above command copied 2 log files to Hadoop file system)
===========================================================
Step 2: Run the hadoop cascading JAR file
===========================================================
./hadoop jar /home/hadoopcluster/cluster_problem_resolution/cascading-
log-analyzer.jar cluster_resolution cluster_resolution_output
OUTPUT: I am getting exception as detailed above.
===========================================================
Below is the Java code for the same:
===========================================================
public class MainClass{
private static Double totalNumberOfLines = 0.0;
private static Map<String, Double> operatingSystem = new
LinkedHashMap<String, Double>();
private static Properties properties = new Properties();
public static void main(String[] args) throws IOException {
FlowConnector.setApplicationJarClass(properties,
MainClass.class);
Tap input = new Hfs(new TextLine(), args[0]);
Pipe pipe = new Each("Requirements", new Fields("line"), new
CustomStringParserFunction());
//Requirement : Operating System ratio
Tap totalNumberOfLinesSink = new Hfs(new TextLine(), args[1]+"/
linecounter");
getTotalNumberOfLines(input, totalNumberOfLinesSink, pipe);
//Requirement : Method count
Tap methodSink = new Hfs(new TextLine(), args[1]+"/methodcount");
methodCount(input, methodSink, pipe);
//Requirement : Unique Request
Tap uniqueRequestSink = new Hfs(new TextLine(), args[1]+"/
uniquerequest");
uniqueRequest(input, uniqueRequestSink, pipe);
//Requirement : Browser Agent
Tap browserAgentSink = new Hfs(new TextLine(), args[1]+"/
browseragent");
browserAgent(input, browserAgentSink, pipe);
//Requirement : Operating System ratio
Tap operatingSystemSink = new Hfs(new TextLine(), args[1]+"/
operatingsystem");
operatingSystem(input, operatingSystemSink, pipe);
//Requirement : Failed Pages
Tap failedPagesSink = new Hfs(new TextLine(), args[1]+"/
failedpages");
failedPages(input, failedPagesSink, pipe);
JobConf job = new JobConf(MainClass.class);
FileSystem hdfs = FileSystem.get(job);
Path src = new Path(args[1]);
Path dst = new Path(args[2]);
hdfs.copyToLocalFile(src, dst);
}
private static void executeFlows(Tap input, Tap sink, Pipe pipe){
FlowConnector flowconnector = new FlowConnector(properties);
Flow flow = flowconnector.connect(input, sink, pipe);
flow.start();
flow.complete();
}
private static void methodCount(Tap input, Tap sink, final Pipe pipe)
{
//Pipe pipe = new Each("methodRequirement", new Fields("line"), new
CustomStringParserFunction());
Pipe pipe1 = new GroupBy(pipe, new Fields("method"));
Aggregator aggregator = new Count(new Fields("count1"));
pipe1 = new Every(pipe1, aggregator);
executeFlows(input,sink,pipe1);
}
private static void uniqueRequest(Tap input, Tap sink, final Pipe
pipe){
//Pipe pipe = new Each("uniqueRequirement", new Fields("line"), new
CustomStringParserFunction());
Pipe pipe1 = new GroupBy(pipe, new Fields("resource"));
Aggregator aggregator = new Count(new Fields("count2"));
pipe1 = new Every(pipe1, aggregator);
executeFlows(input, sink, pipe1);
}
private static void browserAgent(Tap input, Tap sink, final Pipe pipe)
{
//Pipe pipe = new Each("browserAgentRequirement", new
Fields("line"), new CustomStringParserFunction());
Pipe pipe1 = new GroupBy(pipe, new Fields("browserAgent"));
Aggregator aggregator = new Count(new Fields("count3"));
pipe1 = new Every(pipe1, aggregator);
executeFlows(input, sink, pipe1);
}
private static void operatingSystem(Tap input, Tap sink, final Pipe
pipe){
//Pipe pipe = new Each("browserAgentRequirement", new
Fields("line"), new CustomStringParserFunction());
Pipe pipe1= new GroupBy(pipe, new Fields("operatingSyatem"));
Aggregator aggregator = new Count(new Fields("count4"));
pipe1 = new Every(pipe1, aggregator);
executeFlows(input, sink, pipe1);
}
private static void failedPages(Tap input, Tap sink, final Pipe pipe)
{
Pipe pipe1 = new GroupBy(pipe, new Fields("isErrorPage"));
pipe1 = new Every(pipe1, Fields.GROUP, new Sum(new
Fields("isErrorPage == 1")));
executeFlows(input, sink, pipe1);
}
private static void getTotalNumberOfLines(Tap input, Tap sink, final
Pipe pipe){
Pipe pipe1 = new GroupBy(pipe, new Fields("dummyCounter"));/*
Aggregator aggregator = new Sum(new Fields("dummyCounter == 1"));*/
pipe1 = new Every(pipe1, Fields.GROUP, new Sum(new
Fields("dummyCounter == 1")));
executeFlows(input, sink, pipe1);
}
}
public class CustomStringParserFunction extends BaseOperation
implements Function {
public CustomStringParserFunction() {
super(1, new Fields("browserAgent", "operatingSyatem", "method",
"resource", "isErrorPage", "dummyCounter"));
}
@Override
public void operate(FlowProcess flowProcess, FunctionCall
functionCall) {
TupleEntry arguments = functionCall.getArguments();
String[] array = (String.valueOf(arguments.getString(0))).split("
");
Tuple tuple = new Tuple();
tuple.add(array[21]);
tuple.add(array[15]);
tuple.add((array[5].toString()).replace("\"", ""));
tuple.add(array[6]);
if(!array[8].equals("200")){
tuple.add(1);
}else{
tuple.add(0);
}
tuple.add(1);
functionCall.getOutputCollector().add(tuple);