Its in standalone mode. . This is the output:
when I run it I pipe the output to a file, but there is still output
to screen.
screen:
$ hadoop jar ./build/wordcount.jar data/url+page.200.txt output local
> file
/cygdrive/c/hadoop/hadoop-0.21.0/bin/hadoop-config.sh: line 189: C:
\Program: command not found
10/09/10 16:55:49 INFO wordcount.Main: Hello World
10/09/10 16:55:49 WARN wordcount.Main: low fuel
10/09/10 16:55:50 INFO flow.MultiMapReducePlanner: using application
jar: /C:/cascading/wordcount/./build/wordcount.jar
10/09/10 16:55:50 INFO flow.MultiMapReducePlanner: using application
jar: /C:/cascading/wordcount/./build/wordcount.jar
10/09/10 16:55:50 INFO flow.MultiMapReducePlanner: using application
jar: /C:/cascading/wordcount/./build/wordcount.jar
10/09/10 16:55:50 INFO flow.MultiMapReducePlanner: using application
jar: /C:/cascading/wordcount/./build/wordcount.jar
10/09/10 16:55:50 INFO cascade.Cascade: Concurrent, Inc - Cascading
1.1.2 [hadoop-0.19.2+]
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flows: 4
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
allocating threads: 4
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flow: import pages
10/09/10 16:55:50 INFO security.Groups: Group mapping
impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
10/09/10 16:55:50 INFO flow.Flow: [import pages] sink oldest modified
date: Sat Sep 04 17:09:38 PDT 2010
10/09/10 16:55:50 INFO flow.Flow: [import pages] source modification
date at: Mon Aug 02 00:14:41 PDT 2010
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
skipping flow: import pages
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flow: url pipe+word pipe
10/09/10 16:55:50 INFO flow.Flow: [url pipe+word pipe] sink oldest
modified date: Sat Sep 04 17:10:35 PDT 2010
10/09/10 16:55:50 INFO flow.Flow: [url pipe+word pipe] source
modification date at: Sat Sep 04 17:09:38 PDT 2010
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
skipping flow: url pipe+word pipe
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flow: export word
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
starting flow: export url
10/09/10 16:55:50 INFO flow.Flow: [export url] sink oldest modified
date: Sat Sep 04 17:25:16 PDT 2010
10/09/10 16:55:50 INFO flow.Flow: [export url] source modification
date at: Sat Sep 04 17:10:35 PDT 2010
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
skipping flow: export url
10/09/10 16:55:50 INFO flow.Flow: [export word] sink oldest modified
date: Sat Sep 04 17:25:15 PDT 2010
10/09/10 16:55:50 INFO flow.Flow: [export word] source modification
date at: Sat Sep 04 17:11:07 PDT 2010
10/09/10 16:55:50 INFO cascade.Cascade: [import pages+url pipe+...]
skipping flow: export word
file:
234 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
234 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
469 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
469 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
531 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
531 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
562 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
562 [main] INFO cascading.flow.MultiMapReducePlanner - using
application jar: /C:/cascading/wordcount/./build/wordcount.jar
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - Concurrent, Inc - Cascading 1.1.2
[hadoop-0.19.2+]
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - Concurrent, Inc - Cascading 1.1.2
[hadoop-0.19.2+]
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...] starting
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...] starting
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...]
starting flows: 4
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...]
starting flows: 4
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...]
allocating threads: 4
594 [cascade import pages+url pipe+word pipe+export url+export word]
INFO cascading.cascade.Cascade - [import pages+url pipe+...]
allocating threads: 4
625 [pool-1-thread-1] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: import pages
625 [pool-1-thread-1] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: import pages
672 [pool-1-thread-1] INFO org.apache.hadoop.security.Groups - Group
mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
672 [pool-1-thread-1] INFO org.apache.hadoop.security.Groups - Group
mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
750 [pool-1-thread-1] INFO cascading.flow.Flow - [import pages] sink
oldest modified date: Sat Sep 04 17:09:38 PDT 2010
750 [pool-1-thread-1] INFO cascading.flow.Flow - [import pages] sink
oldest modified date: Sat Sep 04 17:09:38 PDT 2010
750 [pool-1-thread-1] INFO cascading.flow.Flow - [import pages]
source modification date at: Mon Aug 02 00:14:41 PDT 2010
750 [pool-1-thread-1] INFO cascading.flow.Flow - [import pages]
source modification date at: Mon Aug 02 00:14:41 PDT 2010
750 [pool-1-thread-1] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: import pages
750 [pool-1-thread-1] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: import pages
750 [pool-1-thread-2] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: url pipe+word pipe
750 [pool-1-thread-2] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: url pipe+word pipe
765 [pool-1-thread-2] INFO cascading.flow.Flow - [url pipe+word pipe]
sink oldest modified date: Sat Sep 04 17:10:35 PDT 2010
765 [pool-1-thread-2] INFO cascading.flow.Flow - [url pipe+word pipe]
sink oldest modified date: Sat Sep 04 17:10:35 PDT 2010
765 [pool-1-thread-2] INFO cascading.flow.Flow - [url pipe+word pipe]
source modification date at: Sat Sep 04 17:09:38 PDT 2010
765 [pool-1-thread-2] INFO cascading.flow.Flow - [url pipe+word pipe]
source modification date at: Sat Sep 04 17:09:38 PDT 2010
765 [pool-1-thread-2] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: url pipe+word pipe
765 [pool-1-thread-2] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: url pipe+word pipe
765 [pool-1-thread-3] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: export word
765 [pool-1-thread-3] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: export word
765 [pool-1-thread-4] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: export url
765 [pool-1-thread-4] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] starting flow: export url
781 [pool-1-thread-4] INFO cascading.flow.Flow - [export url] sink
oldest modified date: Sat Sep 04 17:25:16 PDT 2010
781 [pool-1-thread-4] INFO cascading.flow.Flow - [export url] sink
oldest modified date: Sat Sep 04 17:25:16 PDT 2010
797 [pool-1-thread-4] INFO cascading.flow.Flow - [export url] source
modification date at: Sat Sep 04 17:10:35 PDT 2010
797 [pool-1-thread-4] INFO cascading.flow.Flow - [export url] source
modification date at: Sat Sep 04 17:10:35 PDT 2010
797 [pool-1-thread-4] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: export url
797 [pool-1-thread-4] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: export url
797 [pool-1-thread-3] INFO cascading.flow.Flow - [export word] sink
oldest modified date: Sat Sep 04 17:25:15 PDT 2010
797 [pool-1-thread-3] INFO cascading.flow.Flow - [export word] sink
oldest modified date: Sat Sep 04 17:25:15 PDT 2010
797 [pool-1-thread-3] INFO cascading.flow.Flow - [export word] source
modification date at: Sat Sep 04 17:11:07 PDT 2010
797 [pool-1-thread-3] INFO cascading.flow.Flow - [export word] source
modification date at: Sat Sep 04 17:11:07 PDT 2010
797 [pool-1-thread-3] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: export word
797 [pool-1-thread-3] INFO cascading.cascade.Cascade - [import pages
+url pipe+...] skipping flow: export word
code:
package wordcount;
import java.util.Map;
import java.util.Properties;
import java.util.Arrays;
import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexGenerator;
import cascading.operation.regex.RegexReplace;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.xml.TagSoupParser;
import cascading.operation.xml.XPathGenerator;
import cascading.operation.xml.XPathOperation;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.scheme.SequenceFile;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.Lfs;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.operation.Debug;
import cascading.operation.DebugLevel;
/**
*
*/
public class Main
{
static Logger logger = Logger.getLogger(Main.class);
private static class ImportCrawlDataAssembly extends SubAssembly
{
public ImportCrawlDataAssembly( String name )
{
BasicConfigurator.configure();
// split the text line into "url" and "raw" with the default
delimiter of tab
RegexSplitter regexSplitter = new RegexSplitter( new
Fields( "url", "raw" ) );
Pipe importPipe = new Each( name, new Fields( "line" ),
regexSplitter );
// System.out.println("debug");
importPipe = new Each( importPipe, DebugLevel.VERBOSE, new
Debug() );
// remove all pdf documents from the stream
importPipe = new Each( importPipe, new Fields( "url" ), new
RegexFilter( ".*\\.pdf$", true ) );
System.out.println("debug");
importPipe = new Each( importPipe, DebugLevel.VERBOSE, new
Debug() );
// replace ":nl" with a new line, return the fields "url" and
"page" to the stream.
// discared the other fields in the stream
RegexReplace regexReplace = new RegexReplace( new
Fields( "page" ), ":nl:", "\n" );
importPipe = new Each( importPipe, new Fields( "raw" ),
regexReplace, new Fields( "url", "page" ) );
// System.out.println("debug");
importPipe = new Each( importPipe, DebugLevel.VERBOSE, new
Debug() );
setTails( importPipe );
}
}
private static class WordCountSplitAssembly extends SubAssembly
{
public WordCountSplitAssembly( String sourceName, String
sinkUrlName, String sinkWordName )
{
// create a new pipe assembly to create the word count across
all the pages, and the word count in a single page
Pipe pipe = new Pipe( sourceName );
// convert the html to xhtml using the TagSouParser. return only
the fields "url" and "xml", discard the rest
pipe = new Each( pipe, new Fields( "page" ), new
TagSoupParser( new Fields( "xml" ) ), new Fields( "url", "xml" ) );
// apply the given XPath expression to the xml in the "xml"
field. this expression extracts the 'body' element.
XPathGenerator bodyExtractor = new XPathGenerator( new
Fields( "body" ), XPathOperation.NAMESPACE_XHTML, "//xhtml:body" );
pipe = new Each( pipe, new Fields( "xml" ), bodyExtractor, new
Fields( "url", "body" ) );
// apply another XPath expression. this expression removes all
elements from the xml, leaving only text nodes.
// text nodes in a 'script' element are removed.
String elementXPath = "//text()[ name(parent::node()) !=
'script']";
XPathGenerator elementRemover = new XPathGenerator( new
Fields( "words" ), XPathOperation.NAMESPACE_XHTML, elementXPath );
pipe = new Each( pipe, new Fields( "body" ), elementRemover, new
Fields( "url", "words" ) );
// apply the regex to break the document into individual words
and stuff each word at a new tuple into the current
// stream with field names "url" and "word"
RegexGenerator wordGenerator = new RegexGenerator( new
Fields( "word" ), "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)" );
pipe = new Each( pipe, new Fields( "words" ), wordGenerator, new
Fields( "url", "word" ) );
// group on "url"
Pipe urlCountPipe = new GroupBy( sinkUrlName, pipe, new
Fields( "url", "word" ) );
urlCountPipe = new Every( urlCountPipe, new Fields( "url",
"word" ), new Count(), new Fields( "url", "word", "count" ) );
// group on "word"
Pipe wordCountPipe = new GroupBy( sinkWordName, pipe, new
Fields( "word" ) );
wordCountPipe = new Every( wordCountPipe, new Fields( "word" ),
new Count(), new Fields( "word", "count" ) );
setTails( urlCountPipe, wordCountPipe );
}
}
public static void main( String[] args )
{
// set the current job jar
BasicConfigurator.configure();
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );
FlowConnector flowConnector = new FlowConnector( properties );
String inputPath = args[ 0 ];
String pagesPath = args[ 1 ] + "/pages/";
String urlsPath = args[ 1 ] + "/urls/";
String wordsPath = args[ 1 ] + "/words/";
String localUrlsPath = args[ 2 ] + "/urls/";
String localWordsPath = args[ 2 ] + "/words/";
// import a text file with crawled pages from the local filesystem
into a Hadoop distributed filesystem
// the imported file will be a native Hadoop sequence file with
the fields "page" and "url"
// note this examples stores crawl pages as a tabbed file, with
the first field being the "url"
// and the second being the "raw" document that had all new line
chars ("\n") converted to the text ":nl:".
// a predefined pipe assembly that returns fields named "url" and
"page"
Pipe importPipe = new ImportCrawlDataAssembly( "import pipe" );
importPipe = new Each( importPipe, DebugLevel.VERBOSE, new
Debug() );
// create the tap instances
Tap localPagesSource = new Lfs( new TextLine(), inputPath );
Tap importedPages = new Hfs( new SequenceFile( new Fields( "url",
"page" ) ), pagesPath );
// connect the pipe assembly to the tap instances
Flow importPagesFlow = flowConnector.connect( "import pages",
localPagesSource, importedPages, importPipe );
// a predefined pipe assembly that splits the stream into two
named "url pipe" and "word pipe"
// these pipes could be retrieved via the getTails() method and
added to new pipe instances
SubAssembly wordCountPipe = new WordCountSplitAssembly( "wordcount
pipe", "url pipe", "word pipe" );
// create Hadoop sequence files to store the results of the counts
Tap sinkUrl = new Hfs( new SequenceFile( new Fields( "url",
"word", "count" ) ), urlsPath );
Tap sinkWord = new Hfs( new SequenceFile( new Fields( "word",
"count" ) ), wordsPath );
// convenience method to bind multiple pipes and taps
Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"url
pipe", "word pipe"}, Tap.taps( sinkUrl, sinkWord ) );
// wordCountPipe will be recognized as an assembly and handled
appropriately
Flow count = flowConnector.connect( importedPages, sinks,
wordCountPipe );
// create an assembly to export the Hadoop sequence file to local
text files
Pipe exportPipe = new Each( "export pipe", new Identity() );
Tap localSinkUrl = new Lfs( new TextLine(), localUrlsPath );
Tap localSinkWord = new Lfs( new TextLine(), localWordsPath );
// connect up both sinks using the same exportPipe assembly
Flow exportFromUrl = flowConnector.connect( "export url", sinkUrl,
localSinkUrl, exportPipe );
Flow exportFromWord = flowConnector.connect( "export word",
sinkWord, localSinkWord, exportPipe );
// connect up all the flows, order is not significant
Cascade cascade = new CascadeConnector().connect( importPagesFlow,
count, exportFromUrl, exportFromWord );
// run the cascade to completion
cascade.complete();