Multiple independent flows getting connected together?

33 views
Skip to first unread message

Chris Curtin

unread,
Oct 17, 2008, 5:23:38 PM10/17/08
to cascading-user
Hi,

I'm trying to dynamically create a set of map reduce jobs using
Cascading. The idea is as follows:
- read the source file
- for every column in the source file, get the unique values and count
of each, write values/count into a separate file by column

I am doing the following:
1. creating the Flow for reading the file from disk and into Cascading
(this works, leaving the output file in Tap I defined)
2 for every column in the Pipe from step #1:
2a create a new Pipe, with the previous pipe being the Pipe used in
Step #1, it has a unique name
2b GroupBy and Every on the pipe in 2a to get the values/counts
2c Tap with the unique name as the destination directory
2d A new Flow using the output tap from step #1, the Pipe from 2a, the
tap from 2c
3. Create a Cascade with the flow from #1 and all the flows created in
2d

When it runs, the first column's GroupBy is fired and the file is
created with the correct data. However the second and subsequent Flows
fail with the following error:
08/10/17 16:12:06 WARN mapred.LocalJobRunner: job_local_0003
java.io.IOException: Not a file: file:/c:/temp/clouds/output/logs/
created/recipi
ent_id
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.ja
va:226)
at
cascading.tap.hadoop.MultiInputFormat.getSplits(MultiInputFormat.java
:243)
at
cascading.tap.hadoop.MultiInputFormat.getSplits(MultiInputFormat.java
:186)
at org.apache.hadoop.mapred.LocalJobRunner
$Job.run(LocalJobRunner.java:1
06)

Where recipient_id is the name of the first column in the file. It
appears Hadoop thinks this is an input to the next Flow instead of the
output from the first?

My code:

String inputPath = args[0];
String logsPath = args[1] + "/logs/created";

String sentFile = inputPath + "/sent_test.txt";

// set the current job jar
Properties properties = new Properties();
FlowConnector.setJarClass(properties, Joins.class);

FlowConnector flowConnector = new FlowConnector(properties);
CascadeConnector cascadeConnector = new CascadeConnector();

// build fields for the sent information
//
Fields sentFields = new Fields("mailing_id", "report_id",
"recipient_id", "sent_date", "suppressed");
Pipe sentPipe = new Each("sent", new Fields("line"), new
RegexSplitter(sentFields, "\t"));

// create tap to read a resource from the local file system
Tap localSentTap = new Lfs(new TextLine(), sentFile);

// create a tap to read/write from the local filesystem
Tap outputTap = new Lfs(new TextLine(), logsPath);

// Now tie together the selection from the source file into
where the others can read it
//
Flow importLogFlow = flowConnector.connect(localSentTap,
outputTap, sentPipe);

List<Flow> flows = new ArrayList<Flow>();
// now get the list of all columns from the sentPipe and
create a GroupBy and Output Tap for each
//
flows.add(importLogFlow); // add the first flow
for (int fieldNum = 0; fieldNum < sentFields.size(); fieldNum+
+) {
String name = (String) sentFields.get(fieldNum);

// create a pipe for this field
//
Pipe elementPipe = new Pipe("name:" + name, sentPipe);
elementPipe = new GroupBy(elementPipe, new Fields(name));
elementPipe = new Every(elementPipe, Fields.KEYS, new
Count());

// create the Tap to write the results
//
Tap elementTap = new Lfs(new TextLine(), logsPath + "/" +
name);
Flow detailFlow = flowConnector.connect(outputTap,
elementTap, elementPipe);
flows.add(detailFlow);
}

Flow[] newFlows = new Flow[flows.size()];
for (int i = 0; i < flows.size(); i++) {
newFlows[i] = flows.get(i);
}

// connect the flows by their dependencies, order is not
significant
Cascade cascade = cascadeConnector.connect( newFlows);

// execute the cascade, which in turn executes each flow in
dependency order
cascade.complete();

Thanks,

Chris

Chris K Wensel

unread,
Oct 17, 2008, 6:05:19 PM10/17/08
to cascadi...@googlegroups.com
Hadoop dislikes nested directories. Use a new path unique to each
'dynamic' flow.

let me know if that works.

ckw

fyi, i'm on #cascading irc. you might get quicker responses from me
and the folk there.

--
Chris K Wensel
ch...@wensel.net
http://chris.wensel.net/
http://www.cascading.org/

Chris Curtin

unread,
Oct 20, 2008, 11:29:41 AM10/20/08
to cascading-user
That did the trick. Thanks for the help.

I'll get on the IRC when I try the next thing ;-)

Thanks,

Chris
Reply all
Reply to author
Forward
0 new messages