debug - beginner question

18 views
Skip to first unread message

Brian Wolf

unread,
Sep 9, 2010, 11:03:32 PM9/9/10
to cascading-user
Hi,

I am new. I would like to simply see the effect of each pipe on the
data in the wordcount example. I put
importPipe = new Each( importPipe, DebugLevel.VERBOSE, new Debug() );
but I don't see anything like that, Also, dont see any output labled
"debug"

Thanks,
Brian

Chris Curtin

unread,
Sep 10, 2010, 7:56:04 AM9/10/10
to cascading-user
are you running standalone Hadoop (single node) or in a cluster? if in
a cluster the results may be written to the node where the mapper is
running, not where you launched the job.

Brian Wolf

unread,
Sep 10, 2010, 8:04:07 PM9/10/10
to cascading-user
Its in standalone mode. . This is the output:


when I run it I pipe the output to a file, but there is still output
to screen.


screen:
$ hadoop jar ./build/wordcount.jar data/url+page.200.txt output local
> file
/cygdrive/c/hadoop/hadoop-0.21.0/bin/hadoop-config.sh: line 189: C:
\Program: command not found
10/09/10 16:55:49 INFO wordcount.Main: Hello World
10/09/10 16:55:49 WARN wordcount.Main: low fuel
10/09/10 16:55:50 INFO flow.MultiMapReducePlanner: using application
jar: /C:/cascading/wordcount/./build/wordcount.jar
10/09/10 16:55:50 INFO flow.MultiMapReducePlanner: using application
jar: /C:/cascading/wordcount/./build/wordcount.jar
10/09/10 16:55:50 INFO flow.MultiMapReducePlanner: using application
jar: /C:/cascading/wordcount/./build/wordcount.jar
10/09/10 16:55:50 INFO flow.MultiMapReducePlanner: using application
jar: /C:/cascading/wordcount/./build/wordcount.jar
10/09/10 16:55:50 INFO cascade.Cascade: Concurrent, Inc - Cascading
1.1.2 [hadoop-0.19.2+]
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flows: 4
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
allocating threads: 4
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flow: import pages
10/09/10 16:55:50 INFO security.Groups: Group mapping
impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
10/09/10 16:55:50 INFO flow.Flow: [import pages] sink oldest modified
date: Sat Sep 04 17:09:38 PDT 2010
10/09/10 16:55:50 INFO flow.Flow: [import pages] source modification
date at: Mon Aug 02 00:14:41 PDT 2010
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
skipping flow: import pages
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flow: url pipe+word pipe
10/09/10 16:55:50 INFO flow.Flow: [url pipe+word pipe] sink oldest
modified date: Sat Sep 04 17:10:35 PDT 2010
10/09/10 16:55:50 INFO flow.Flow: [url pipe+word pipe] source
modification date at: Sat Sep 04 17:09:38 PDT 2010
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
skipping flow: url pipe+word pipe
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flow: export word
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flow: export url
10/09/10 16:55:50 INFO flow.Flow: [export url] sink oldest modified
date: Sat Sep 04 17:25:16 PDT 2010
10/09/10 16:55:50 INFO flow.Flow: [export url] source modification
date at: Sat Sep 04 17:10:35 PDT 2010
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
skipping flow: export url
10/09/10 16:55:50 INFO flow.Flow: [export word] sink oldest modified
date: Sat Sep 04 17:25:15 PDT 2010
10/09/10 16:55:50 INFO flow.Flow: [export word] source modification
date at: Sat Sep 04 17:11:07 PDT 2010
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
skipping flow: export word



file:

234 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
234 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
469 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
469 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
531 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
531 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
562 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
562 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - Concurrent, Inc - Cascading 1.1.2
[hadoop-0.19.2+]
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - Concurrent, Inc - Cascading 1.1.2
[hadoop-0.19.2+]
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...] starting
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...] starting
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...]
starting flows: 4
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...]
starting flows: 4
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...]
allocating threads: 4
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...]
allocating threads: 4
625 [pool-1-thread-1] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: import pages
625 [pool-1-thread-1] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: import pages
672 [pool-1-thread-1] INFO org.apache.hadoop.security.Groups - Group
mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
672 [pool-1-thread-1] INFO org.apache.hadoop.security.Groups - Group
mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
750 [pool-1-thread-1] INFO cascading.flow.Flow - [import pages] sink
oldest modified date: Sat Sep 04 17:09:38 PDT 2010
750 [pool-1-thread-1] INFO cascading.flow.Flow - [import pages] sink
oldest modified date: Sat Sep 04 17:09:38 PDT 2010
750 [pool-1-thread-1] INFO cascading.flow.Flow - [import pages]
source modification date at: Mon Aug 02 00:14:41 PDT 2010
750 [pool-1-thread-1] INFO cascading.flow.Flow - [import pages]
source modification date at: Mon Aug 02 00:14:41 PDT 2010
750 [pool-1-thread-1] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: import pages
750 [pool-1-thread-1] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: import pages
750 [pool-1-thread-2] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: url pipe+word pipe
750 [pool-1-thread-2] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: url pipe+word pipe
765 [pool-1-thread-2] INFO cascading.flow.Flow - [url pipe+word pipe]
sink oldest modified date: Sat Sep 04 17:10:35 PDT 2010
765 [pool-1-thread-2] INFO cascading.flow.Flow - [url pipe+word pipe]
sink oldest modified date: Sat Sep 04 17:10:35 PDT 2010
765 [pool-1-thread-2] INFO cascading.flow.Flow - [url pipe+word pipe]
source modification date at: Sat Sep 04 17:09:38 PDT 2010
765 [pool-1-thread-2] INFO cascading.flow.Flow - [url pipe+word pipe]
source modification date at: Sat Sep 04 17:09:38 PDT 2010
765 [pool-1-thread-2] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: url pipe+word pipe
765 [pool-1-thread-2] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: url pipe+word pipe
765 [pool-1-thread-3] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: export word
765 [pool-1-thread-3] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: export word
765 [pool-1-thread-4] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: export url
765 [pool-1-thread-4] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: export url
781 [pool-1-thread-4] INFO cascading.flow.Flow - [export url] sink
oldest modified date: Sat Sep 04 17:25:16 PDT 2010
781 [pool-1-thread-4] INFO cascading.flow.Flow - [export url] sink
oldest modified date: Sat Sep 04 17:25:16 PDT 2010
797 [pool-1-thread-4] INFO cascading.flow.Flow - [export url] source
modification date at: Sat Sep 04 17:10:35 PDT 2010
797 [pool-1-thread-4] INFO cascading.flow.Flow - [export url] source
modification date at: Sat Sep 04 17:10:35 PDT 2010
797 [pool-1-thread-4] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: export url
797 [pool-1-thread-4] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: export url
797 [pool-1-thread-3] INFO cascading.flow.Flow - [export word] sink
oldest modified date: Sat Sep 04 17:25:15 PDT 2010
797 [pool-1-thread-3] INFO cascading.flow.Flow - [export word] sink
oldest modified date: Sat Sep 04 17:25:15 PDT 2010
797 [pool-1-thread-3] INFO cascading.flow.Flow - [export word] source
modification date at: Sat Sep 04 17:11:07 PDT 2010
797 [pool-1-thread-3] INFO cascading.flow.Flow - [export word] source
modification date at: Sat Sep 04 17:11:07 PDT 2010
797 [pool-1-thread-3] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: export word
797 [pool-1-thread-3] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: export word

code:


package wordcount;

import java.util.Map;
import java.util.Properties;
import java.util.Arrays;

import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;


import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexGenerator;
import cascading.operation.regex.RegexReplace;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.xml.TagSoupParser;
import cascading.operation.xml.XPathGenerator;
import cascading.operation.xml.XPathOperation;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.scheme.SequenceFile;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Lfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.operation.Debug;
import cascading.operation.DebugLevel;


/**
*
*/
public class Main
{

static Logger logger = Logger.getLogger(Main.class);

private static class ImportCrawlDataAssembly extends SubAssembly
{


public ImportCrawlDataAssembly( String name )
{


BasicConfigurator.configure();



// split the text line into "url" and "raw" with the default
delimiter of tab
RegexSplitter regexSplitter = new RegexSplitter( new
Fields( "url", "raw" ) );
Pipe importPipe = new Each( name, new Fields( "line" ),
regexSplitter );

// System.out.println("debug");




importPipe = new Each( importPipe, DebugLevel.VERBOSE, new
Debug() );

// remove all pdf documents from the stream
importPipe = new Each( importPipe, new Fields( "url" ), new
RegexFilter( ".*\\.pdf$", true ) );

System.out.println("debug");

importPipe = new Each( importPipe, DebugLevel.VERBOSE, new
Debug() );

// replace ":nl" with a new line, return the fields "url" and
"page" to the stream.
// discared the other fields in the stream
RegexReplace regexReplace = new RegexReplace( new
Fields( "page" ), ":nl:", "\n" );
importPipe = new Each( importPipe, new Fields( "raw" ),
regexReplace, new Fields( "url", "page" ) );

// System.out.println("debug");

importPipe = new Each( importPipe, DebugLevel.VERBOSE, new
Debug() );





setTails( importPipe );
}
}

private static class WordCountSplitAssembly extends SubAssembly
{
public WordCountSplitAssembly( String sourceName, String
sinkUrlName, String sinkWordName )
{
// create a new pipe assembly to create the word count across
all the pages, and the word count in a single page
Pipe pipe = new Pipe( sourceName );

// convert the html to xhtml using the TagSouParser. return only
the fields "url" and "xml", discard the rest
pipe = new Each( pipe, new Fields( "page" ), new
TagSoupParser( new Fields( "xml" ) ), new Fields( "url", "xml" ) );
// apply the given XPath expression to the xml in the "xml"
field. this expression extracts the 'body' element.
XPathGenerator bodyExtractor = new XPathGenerator( new
Fields( "body" ), XPathOperation.NAMESPACE_XHTML, "//xhtml:body" );
pipe = new Each( pipe, new Fields( "xml" ), bodyExtractor, new
Fields( "url", "body" ) );
// apply another XPath expression. this expression removes all
elements from the xml, leaving only text nodes.
// text nodes in a 'script' element are removed.
String elementXPath = "//text()[ name(parent::node()) !=
'script']";
XPathGenerator elementRemover = new XPathGenerator( new
Fields( "words" ), XPathOperation.NAMESPACE_XHTML, elementXPath );
pipe = new Each( pipe, new Fields( "body" ), elementRemover, new
Fields( "url", "words" ) );
// apply the regex to break the document into individual words
and stuff each word at a new tuple into the current
// stream with field names "url" and "word"
RegexGenerator wordGenerator = new RegexGenerator( new
Fields( "word" ), "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)" );
pipe = new Each( pipe, new Fields( "words" ), wordGenerator, new
Fields( "url", "word" ) );

// group on "url"
Pipe urlCountPipe = new GroupBy( sinkUrlName, pipe, new
Fields( "url", "word" ) );
urlCountPipe = new Every( urlCountPipe, new Fields( "url",
"word" ), new Count(), new Fields( "url", "word", "count" ) );

// group on "word"
Pipe wordCountPipe = new GroupBy( sinkWordName, pipe, new
Fields( "word" ) );
wordCountPipe = new Every( wordCountPipe, new Fields( "word" ),
new Count(), new Fields( "word", "count" ) );

setTails( urlCountPipe, wordCountPipe );
}
}

public static void main( String[] args )
{
// set the current job jar

BasicConfigurator.configure();


Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );
FlowConnector flowConnector = new FlowConnector( properties );

String inputPath = args[ 0 ];
String pagesPath = args[ 1 ] + "/pages/";
String urlsPath = args[ 1 ] + "/urls/";
String wordsPath = args[ 1 ] + "/words/";
String localUrlsPath = args[ 2 ] + "/urls/";
String localWordsPath = args[ 2 ] + "/words/";

// import a text file with crawled pages from the local filesystem
into a Hadoop distributed filesystem
// the imported file will be a native Hadoop sequence file with
the fields "page" and "url"
// note this examples stores crawl pages as a tabbed file, with
the first field being the "url"
// and the second being the "raw" document that had all new line
chars ("\n") converted to the text ":nl:".

// a predefined pipe assembly that returns fields named "url" and
"page"
Pipe importPipe = new ImportCrawlDataAssembly( "import pipe" );


importPipe = new Each( importPipe, DebugLevel.VERBOSE, new
Debug() );


// create the tap instances
Tap localPagesSource = new Lfs( new TextLine(), inputPath );
Tap importedPages = new Hfs( new SequenceFile( new Fields( "url",
"page" ) ), pagesPath );

// connect the pipe assembly to the tap instances
Flow importPagesFlow = flowConnector.connect( "import pages",
localPagesSource, importedPages, importPipe );

// a predefined pipe assembly that splits the stream into two
named "url pipe" and "word pipe"
// these pipes could be retrieved via the getTails() method and
added to new pipe instances
SubAssembly wordCountPipe = new WordCountSplitAssembly( "wordcount
pipe", "url pipe", "word pipe" );

// create Hadoop sequence files to store the results of the counts
Tap sinkUrl = new Hfs( new SequenceFile( new Fields( "url",
"word", "count" ) ), urlsPath );
Tap sinkWord = new Hfs( new SequenceFile( new Fields( "word",
"count" ) ), wordsPath );

// convenience method to bind multiple pipes and taps
Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"url
pipe", "word pipe"}, Tap.taps( sinkUrl, sinkWord ) );

// wordCountPipe will be recognized as an assembly and handled
appropriately
Flow count = flowConnector.connect( importedPages, sinks,
wordCountPipe );

// create an assembly to export the Hadoop sequence file to local
text files
Pipe exportPipe = new Each( "export pipe", new Identity() );

Tap localSinkUrl = new Lfs( new TextLine(), localUrlsPath );
Tap localSinkWord = new Lfs( new TextLine(), localWordsPath );

// connect up both sinks using the same exportPipe assembly
Flow exportFromUrl = flowConnector.connect( "export url", sinkUrl,
localSinkUrl, exportPipe );
Flow exportFromWord = flowConnector.connect( "export word",
sinkWord, localSinkWord, exportPipe );

// connect up all the flows, order is not significant
Cascade cascade = new CascadeConnector().connect( importPagesFlow,
count, exportFromUrl, exportFromWord );

// run the cascade to completion
cascade.complete();

Patrick Ting

unread,
Sep 10, 2010, 8:33:11 PM9/10/10
to cascadi...@googlegroups.com
Try out adding the following:

FlowConnector.setDebugLevel(properties, DebugLevel.VERBOSE);

Brian Wolf

unread,
Sep 10, 2010, 10:07:02 PM9/10/10
to cascading-user
I added that near the bottom of the Main.class
like this


------
flowConnector.setDebugLevel(properties, DebugLevel.VERBOSE);

Flow exportFromUrl = flowConnector.connect( "export url", sinkUrl,
localSinkUrl, exportPipe );
Flow exportFromWord = flowConnector.connect( "export word",
sinkWord, localSinkWord, exportPipe );
// connect up all the flows, order is not significant
Cascade cascade = new CascadeConnector().connect( importPagesFlow,
count, exportFromUrl, exportFromWord );

----

still don't see any debugging

output:
> ...
>
> read more »

Chris K Wensel

unread,
Sep 10, 2010, 10:27:28 PM9/10/10
to cascadi...@googlegroups.com
all your flows were skipped, so nothing was run. Cascade runs like a make file, so won't regenerate data unless its stale.

also, as mentioned previously, the debug output goes to stdout where the tasks are run. so you won't see it in the console since tasks run remotely as child processes to the task tracker.

ckw

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

-- Concurrent, Inc. offers mentoring, support, and licensing for Cascading - support open-source by buying support

Reply all
Reply to author
Forward
0 new messages