MultiTap and multiple directory inputs

246 views
Skip to first unread message

GregG

unread,
Sep 3, 2010, 2:43:00 PM9/3/10
to cascading-user
Hi,

New user here. I must say that I am very impressed with this library.
It is very well designed and thought out. Kudos to the developers.

Anyway I have a question about MultiTap. If I create a Tap and pass in
a directory, all files in that directory get processed, much like if
you are using Hadoop directly. However, if I want to process multiple
directories and create an array of Tap objects which point to
directories and tie them together with a MultiTap, I get an error
telling me that the directory given is not a file:

Caused by: java.io.IOException: Not a file: file:/tmp/dir1
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:
206)
at
cascading.tap.hadoop.MultiInputFormat.getSplits(MultiInputFormat.java:
240)

The workaround seems to be to instead create a MultiTap with one Tap
for each file in each directory, which is not as handy. And since you
can pass directories to Tap objects if you use a single Tap as your
source, but not if it is part of a MultiTap, it seems a bit
inconsistent.

Do you think it is worth filing a feature request to allow Taps in
MultiTaps to refer to directories? Optionally I can probably dig into
the code to fix this myself and submit a patch if that's easier.

Thanks.

-Greg

Chris K Wensel

unread,
Sep 3, 2010, 4:27:31 PM9/3/10
to cascadi...@googlegroups.com
Hi

MultiTap is deprecated in 1.1 with MultiSourceTap.

http://www.cascading.org/1.1/javadoc/cascading/tap/MultiSourceTap.html

The behavior you are explaining is supported.

If it persists, i'll open a bug in 1.1 and see if I can get it resolved for you.

In the mean time, give GlobHfs a try, it might be easier to use.

http://www.cascading.org/1.1/javadoc/cascading/tap/GlobHfs.html

ckw

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

GregG

unread,
Sep 3, 2010, 5:55:23 PM9/3/10
to cascading-user
I tried MultiSourceTap and that also threw the same Exception. Did you
mean that it is not supported? I'm using cascading v1.1.1.

I then switched to GlobHfs and that makes it less of a hassle since I
don't have to create my own Tap for every file so I guess I will stick
with that for now.

Thanks.

-Greg
> > For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en.

Chris K Wensel

unread,
Sep 7, 2010, 1:54:35 PM9/7/10
to cascadi...@googlegroups.com
> I tried MultiSourceTap and that also threw the same Exception. Did you
> mean that it is not supported? I'm using cascading v1.1.1.
>

Using directories should work instead of files. Works fine in my applications actually.

what version of Hadoop?

ckw

-- Concurrent, Inc. offers mentoring, support, and licensing for Cascading - support open-source by buying support

GregG

unread,
Sep 7, 2010, 5:11:56 PM9/7/10
to cascading-user
OK, I figured out what the issue was.

I have code that given a hadoop Path returns an array of all
subdirectories under that Path including the Path itself. Passing the
array of these Paths into FileInputFormat.setInputPaths() works fine
and hadoop processes all files under all of the sub directories. This
is handy if you have one top-level directory and lots of sub
directories under it that contain all of your input files segmented by
day or whatever.

But when I tried the same thing with the MultiTap/MultiSourceTap it
bails with the given Exception since the top-level directory has the
sub directories under it which aren't files, they are directories. So
basically Tap expects a file or a directory of files, but not a
directory with any sub directories under it whereas
FileInputFormat.setInputPaths() doesn't care if a given directory has
sub directories, it just ignores them.

So like I said I am now using GlobHfs instead and that seems to work
fine if I give it a top-level directory it already does the directory
recursion for me and returns all regular files that match the given
expression.

Thanks for the help.

-Greg

On Sep 7, 10:54 am, Chris K Wensel <ch...@wensel.net> wrote:
> > I tried MultiSourceTap and that also threw the same Exception. Did you
> > mean that it is not supported? I'm using cascading v1.1.1.
>
> Using directories should work instead of files. Works fine in my applications actually.
>
> what version of Hadoop?
>
> ckw
>
> --
> Chris K Wensel
> ch...@concurrentinc.comhttp://www.concurrentinc.com

Chris K Wensel

unread,
Sep 8, 2010, 3:13:45 PM9/8/10
to cascadi...@googlegroups.com
Actually Cascading doesn't care if there are sub-directories, only Hadoop does.

Its interesting your not seeing a failure when setting the paths via the raw apis as Cascading uses those same apis.

ckw

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

--
Chris K Wensel
ch...@concurrentinc.com

JOHN MILLER

unread,
Aug 24, 2016, 10:09:14 AM8/24/16
to cascading-user
Greetings

i have somewhat of a problem with reading an input path from my local file system   Although it states that it was "unable to read from input identifier" and "did not parse correct number of values from input data, expected: 6, got: 1:c"...it somehow generated the correct output   Please advise if there is something syntactically wrong with my code.   Additionally, I tried reading a directory from my local file system and for whatever reaseon, it was not able to read that as well  Please advise

John M

[jmill383@starchild part1]$ /opt/hadoop/bin/hadoop jar build/libs/impatient.jar impatient.Main
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hive/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/08/24 10:02:19 INFO property.AppProps: using app.id: 48E6FCA38BD24D3088863A1F584FA6E6
16/08/24 10:02:19 INFO flow.Flow: [Copy] executed rule registry: LocalRuleRegistry, completed as: SUCCESS, in: 00:00.035
16/08/24 10:02:19 INFO flow.Flow: [Copy] rule registry: LocalRuleRegistry, supports assembly with steps: 1, nodes: 1
16/08/24 10:02:19 INFO flow.Flow: [Copy] rule registry: LocalRuleRegistry, result was selected using: 'default comparator: selects plan with fewest steps and fewest nodes'
16/08/24 10:02:19 INFO util.Version: Concurrent, Inc - Cascading 3.0.2
16/08/24 10:02:19 INFO flow.Flow: [Copy] starting
16/08/24 10:02:19 INFO flow.Flow: [Copy]  source: FileTap["TextDelimited[['warctype', 'filename', 'readerid', 'warcdate', 'absoluteoff', 'warcinfo']]"]["/opt/spark/dataset.matt"]
16/08/24 10:02:19 INFO flow.Flow: [Copy]  sink: FileTap["TextDelimited[['warctype', 'filename', 'readerid', 'warcdate', 'absoluteoff', 'warcinfo']]"]["/home/jmill383/cascadingdemo.txt"]
16/08/24 10:02:19 INFO flow.Flow: [Copy]  parallel execution of steps is enabled: true
16/08/24 10:02:19 INFO flow.Flow: [Copy]  executing total steps: 1
16/08/24 10:02:19 INFO flow.Flow: [Copy]  allocating management threads: 1
16/08/24 10:02:19 INFO flow.Flow: [Copy] starting step: (1/1) ...mill383/cascadingdemo.txt
16/08/24 10:02:19 INFO planner.LocalStepRunner: flow node id: C49D03E7C72347C998AF4F301156BC30, mem on start (mb), free: 405, total: 481, max: 481
16/08/24 10:02:19 ERROR element.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.tuple.TupleException: unable to read from input identifier: /opt/spark/dataset.matt
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:152)
        at cascading.flow.stream.element.SourceStage.map(SourceStage.java:84)
        at cascading.flow.stream.element.SourceStage.call(SourceStage.java:60)
        at cascading.flow.stream.element.SourceStage.call(SourceStage.java:40)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: cascading.tap.TapException: did not parse correct number of values from input data, expected: 6, got: 1:c
        at cascading.scheme.util.DelimitedParser.onlyParseLine(DelimitedParser.java:404)
        at cascading.scheme.util.DelimitedParser.parseLine(DelimitedParser.java:341)
        at cascading.scheme.local.TextDelimited.source(TextDelimited.java:675)
        at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:166)
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:139)
        ... 7 more
16/08/24 10:02:19 ERROR element.SourceStage: caught throwable
cascading.tuple.TupleException: unable to read from input identifier: /opt/spark/dataset.matt
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:152)
        at cascading.flow.stream.element.SourceStage.map(SourceStage.java:84)
        at cascading.flow.stream.element.SourceStage.call(SourceStage.java:60)
        at cascading.flow.stream.element.SourceStage.call(SourceStage.java:40)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: cascading.tap.TapException: did not parse correct number of values from input data, expected: 6, got: 1:c
        at cascading.scheme.util.DelimitedParser.onlyParseLine(DelimitedParser.java:404)
        at cascading.scheme.util.DelimitedParser.parseLine(DelimitedParser.java:341)
        at cascading.scheme.local.TextDelimited.source(TextDelimited.java:675)
        at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:166)
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:139)
        ... 7 more
16/08/24 10:02:19 INFO planner.LocalStepRunner: flow node id: C49D03E7C72347C998AF4F301156BC30, mem on close (mb), free: 403, total: 481, max: 481
16/08/24 10:02:20 INFO flow.Flow: [Copy] stopping all jobs
16/08/24 10:02:20 INFO flow.Flow: [Copy] stopping: (1/1) ...mill383/cascadingdemo.txt
16/08/24 10:02:20 INFO flow.Flow: [Copy] stopped all jobs
16/08/24 10:02:20 INFO flow.Flow: [Copy]  completed in: 00:00.202
Exception in thread "main" cascading.flow.FlowException: local step failed: (1/1) ...mill383/cascadingdemo.txt
        at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:289)
        at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:184)
        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:146)
        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:48)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: cascading.tuple.TupleException: unable to read from input identifier: /opt/spark/dataset.matt
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:152)
        at cascading.flow.stream.element.SourceStage.map(SourceStage.java:84)
        at cascading.flow.stream.element.SourceStage.call(SourceStage.java:60)
        at cascading.flow.stream.element.SourceStage.call(SourceStage.java:40)
        ... 4 more
Caused by: cascading.tap.TapException: did not parse correct number of values from input data, expected: 6, got: 1:c
        at cascading.scheme.util.DelimitedParser.onlyParseLine(DelimitedParser.java:404)
        at cascading.scheme.util.DelimitedParser.parseLine(DelimitedParser.java:341)
        at cascading.scheme.local.TextDelimited.source(TextDelimited.java:675)
        at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:166)
        at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:139)
        ... 7 more
[jmill383@starchild part1]$

code

package impatient;

import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;

import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;

import cascading.scheme.local.TextDelimited;
import cascading.tap.local.FileTap;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;

public class Main
{
public static void main( String[] args )
{

String inPath = "/opt/spark/dataset.matt";
String outPath = "/home/jmill383/cascadingdemo.txt";

FlowConnector lfc = new LocalFlowConnector();

Scheme sourceScheme = new TextDelimited(new Fields("warctype", "filename", "readerid", "warcdate", "absoluteoff", "warcinfo"), true, "," );
Scheme sinkScheme = new TextDelimited(new Fields("warctype", "filename", "readerid", "warcdate", "absoluteoff", "warcinfo"), true, ",");

// Scheme sourceScheme = new TextLine(new Fields("offset", "line"));
// Scheme sinkScheme = new TextLine(new Fields("line"));
// create the source tap

Tap inTap = new FileTap( sourceScheme, inPath);

// create the sink tap

Tap outTap = new FileTap( sinkScheme, outPath, SinkMode.UPDATE );

// specify a pipe to connect the taps
Pipe copyPipe = new Pipe( "copy" );
// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef().addSource( copyPipe, inTap ).addTailSink( copyPipe, outTap ).setName( "Copy" );

// run the flowclear

lfc.connect(flowDef).complete();
}
}

Andre Kelpe

unread,
Aug 24, 2016, 10:27:58 AM8/24/16
to cascading-user
You are mixing the local platform and the hadoop platform, which is
incorrect. You should add cascading-hadoop2-mr1 instead of
cascading-local to your project and use the Hadoop2MR1FlowConnector
instead of the LocalFlowConnector.

- André
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cascading-use...@googlegroups.com.
> To post to this group, send email to cascadi...@googlegroups.com.
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/afbecadf-036b-4e7d-a443-40c857afea7d%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
André Kelpe
an...@concurrentinc.com
http://concurrentinc.com
Reply all
Reply to author
Forward
0 new messages