Process Hadoop SequenceFile with Cascading

488 views
Skip to first unread message

Matthias Scherer

unread,
Jul 1, 2010, 9:25:39 AM7/1/10
to cascadi...@googlegroups.com
Hi,

I'm new to cascading and want to process a Sequencefile that has been
created by a java Hadoop job.

The java job that has created the SequenceFile:
// job setup
...
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
MultipleOutputs.addNamedOutput(conf, "out1",
SequenceFileOutputFormat.class, Text.class, Text.class);
...
// reducer
...
out1collector.collect(new Text(outKey), new Text(outVal));
....

How can I process this Hadoop SequenceFile with Cascading? I tried the
following:
...
SequenceFile scheme = new SequenceFile(new Fields("key", "value"));
Tap importTap = new Dfs(scheme, inputPath);
Pipe importPipe = new Each("import", new Fields("key"), new
Identity());
Tap exportTap = new Dfs(new SequenceFile(new Fields("key")),
outputPath, SinkMode.REPLACE );
Properties properties = new Properties();
FlowConnector.setApplicationJarClass(properties, Main.class);
Flow myFlow = new FlowConnector(properties).connect(importTap,
exportTap, importPipe );
myFlow.start();
myFlow.complete();
}
}

But I get the following error:

Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text
cannot be cast to cascading.tuple.Tuple
at cascading.scheme.SequenceFile.source(SequenceFile.java:80)
at cascading.tap.Tap.source(Tap.java:239)
at
cascading.flow.stack.SourceMapperStackElement.operateSource(SourceMapper
StackElement.java:80)
... 7 more

Could you please tell me what's wrong? Am I right that the SequenceFile
written with my Java Hadoop job contains a tuple with two Text
instances?! And can they be read with Cascading with the first line of
my cascading code snippet? I wanted to read in the original SequenceFile
and write out a new SequenceFile containing only the key field of the
input file.

Thanks and Regards
Matthias

Ken Krugler

unread,
Jul 1, 2010, 9:59:24 AM7/1/10
to cascadi...@googlegroups.com
Hi Mathias,

What Cascading calls a "SequenceFile" is Hadoop SequenceFile that
consists of a special Tuples.NULL Tuple value for each key, and the
actual Tuple as the value.

What you've got is a Hadoop SequenceFile that has a Text object for
each key and another Text object for each value.

So when the Cascading SequenceFile schema tries to process your Text/
Text SequenceFile, it tries to cast the Text object to a Tuple, which
won't work.

I haven't had to deal with this situation in the past, but I can't
think of any trivial way to make it work inside of Cascading. You
could do it by creating a custom Scheme, e.g. subclass Cascading's
SequenceFIle and override the source() method. Currently this is:

@Override
public Tuple source( Object key, Object value )
{
return (Tuple) value;
}

But you could change it to be something like:

@Override
public Tuple source( Object key, Object value )
{
return new Tuple(key.toString(), value.toString());
}

The key/value Objects should be Text, so toString() would give you
what you want (maybe not the most efficient approach)

If that doesn't work, then I think you'd have to write a regular
Hadoop mapred job to convert the Text/Text SequenceFile into a regular
text file.

-- Ken

> --
> You received this message because you are subscribed to the Google
> Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
> .
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en
> .
>

--------------------------------------------
Ken Krugler
+1 530-210-6378
http://bixolabs.com
e l a s t i c w e b m i n i n g


Alex Cozzi

unread,
Jul 1, 2010, 2:41:51 PM7/1/10
to cascadi...@googlegroups.com
I can confirm that it works just fine. I am doing just right now. You need 2 things: (sorry, the code snippets are in Scala, but I am sure you can figure it ot)

1) extend SequenceFIle

class SessionSequenceFile() extends SequenceFile(new Fields("key", "session")) {
  override def source(key: Object, value: Object) : Tuple = new Tuple(key, value.asInstanceOf[SessionContainer])
}

2) the first operation that runs after the tap needs to be careful to unpack the object from the tuple: you need to use
tuple.getObject() method to avoid exceptions:

class ConvertSessions[Nothing]() extends BaseOperation[Nothing]() with Function[Nothing] {


  def operate(flowProcess :  FlowProcess, functionCall : FunctionCall[Nothing]) {
    val input : TupleEntry = functionCall.getArguments()
    val outputCollector : TupleEntryCollector = functionCall.getOutputCollector()

  

    val session : SessionContainer= input.getObject("session").asInstanceOf[SessionContainer]
    <etc. etc.....>


and personally I use the fist operation after the tap to convert the object into tuples for ease of processing downstream...
Alex

On Jul 1, 2010, at 6:59 AM, Ken Krugler wrote:

Hi Mathias,

What Cascading calls a "SequenceFile" is Hadoop SequenceFile that consists of a special Tuples.NULL Tuple value for each key, and the actual Tuple as the value.

What you've got is a Hadoop SequenceFile that has a Text object for each key and another Text object for each value.

So when the Cascading SequenceFile schema tries to process your Text/Text SequenceFile, it tries to cast the Text object to a Tuple, which won't work.
Reply all
Reply to author
Forward
0 new messages