[StormCrawler] Crawl and get websites content in WARC, windows setup

579 views
Skip to first unread message

michel....@gmail.com

unread,
Nov 11, 2016, 2:35:56 AM11/11/16
to DigitalPebble

Hello,

First, a bit of context, I' quite new to the crawling world, Storm and StormCrawler.

I'm trying to crawl a list of domains in order to get the full html, CSS and JS content of all pages, exported into WARC files.
For that I'm trying both Nutch and StormCrawler.

I locally installed on windows 10 StormCrawler using the maven archetype. 

  -> I ran into this issue .So i had to remove the depency with flux-core from the pom.

Managed to fetch some urls from a domain, using the standard StdOutStatusUpdater from the CrawlTopology.

Then I included the external WARC export library from github and modified the CrawlTopology.

   -> For that i had to add depedency to storm-hdfs and commons-codec.
  -> Then got an Hadoop connection refused exception, linked to these action : 

        String fsURL = "hdfs://localhost:9000";
        warcbolt.withFsUrl(fsURL);


I locally installed and run hadoop hdfs (which was quite an hassle on windows...) even though I'm not quite sure it was necessary, can you comment on that ?

Anyway, even if it removed some exception i also add to comment the above lines to let the standard confirguration to be able to connect to localhost.

Now, I'm still not able to get any of the content, i can see in the standard output that it's discovering and fetching some urls from the host and it generates an empty warc.gz file.

Do you have any idea what steps I missed or did wrong ?
Any help would be very appreciated :)

Here is the content of crawler-conf.yaml:


# Custom configuration for StormCrawler
# This is used to override the default values from crawler-default.xml and provide additional ones 
# for your custom components.
# Use this file with the parameter -config when launching your extension of ConfigurableTopology.  
# This file does not contain all the key values but only the most frequently used ones. See crawler-default.xml for an extensive list.

config: 
  topology.workers: 1
  topology.message.timeout.secs: 300
  topology.max.spout.pending: 10
  topology.debug: false

  # mandatory when using Flux
  topology.kryo.register:
    - com.digitalpebble.stormcrawler.Metadata

  # metadata to transfer to the outlinks
  # used by Fetcher for redirections, sitemapparser, etc...
  # these are also persisted for the parent document (see below)
  # metadata.transfer:
  # - customMetadataName

  # lists the metadata to persist to storage
  # these are not transfered to the outlinks
  metadata.persist:
   - _redirTo
   - error.cause
   - error.source
   - isSitemap
   - isFeed

  http.agent.name: "StormCrawlerTest4G"
  http.agent.version: "1.0"
  http.agent.description: "A Bot 4g"

  # FetcherBolt queue dump : comment out to activate
  # if a file exists on the worker machine with the corresponding port number
  # the FetcherBolt will log the content of its internal queues to the logs
  # fetcherbolt.queue.debug.filepath: "/tmp/fetcher-dump-{port}

  parsefilters.config.file: "parsefilters.json"
  urlfilters.config.file: "urlfilters.json"

  # revisit a page daily (value in minutes)
  # set it to -1 to never refetch a page
  fetchInterval.default: 1440

  # revisit a page with a fetch error after 2 hours (value in minutes)
  # set it to -1 to never refetch a page
  fetchInterval.fetch.error: 120

  # never revisit a page with an error (or set a value in minutes)
  fetchInterval.error: -1

  # custom fetch interval to be used when a document has the key/value in its metadata
  # and has been fetched succesfully (value in minutes)
  # fetchInterval.isFeed=true: 10

  # configuration for the classes extending AbstractIndexerBolt
  # indexer.md.filter: "someKey=aValue"
  indexer.url.fieldname: "url"
  indexer.text.fieldname: "content"
  indexer.canonical.name: "canonical"
  indexer.md.mapping:
  - parse.title=title
  - parse.keywords=keywords
  - parse.description=description
  - domain=domain

  # Metrics consumers:
  topology.metrics.consumer.register:
     - class: "org.apache.storm.metric.LoggingMetricsConsumer"
       parallelism.hint: 1

  #for test puprose
  fetcher.threads.per.queue: 4
  fetcher.max.crawl.delay: 1
  fetcher.server.delay: 0.1
  fetcher.server.min.delay: 0.1
  http.content.limit: -1
 
content of parsefilters.json:

{
  "com.digitalpebble.stormcrawler.parse.ParseFilters": [
    {
      "class": "com.digitalpebble.stormcrawler.parse.filter.XPathFilter",
      "name": "XPathFilter",
      "params": {
        "canonical": "//*[@rel=\"canonical\"]/@href",
        "parse.description": [
            "//*[@name=\"description\"]/@content",
            "//*[@name=\"Description\"]/@content"
         ],
        "parse.title": [
            "//TITLE",
            "//META[@name=\"title\"]/@content"
         ],
         "parse.keywords": "//META[@name=\"keywords\"]/@content"
      }
    },
    {
      "class": "com.zwoop.crawler.ContentFilterOut",
      "name": "ContentFilter",
      "params": {
"pattern": "*"
       }
    },
    {
      "class": "com.digitalpebble.stormcrawler.parse.filter.DomainParseFilter",
      "name": "DomainParseFilter",
      "params": {
        "key": "domain",
        "byHost": false
       }
    }
  ]
}

urlfilters.json:

{
  "com.digitalpebble.stormcrawler.filtering.URLFilters": [
    {
      "class": "com.digitalpebble.stormcrawler.filtering.basic.BasicURLFilter",
      "name": "BasicURLFilter",
      "params": {
        "maxPathRepetition": 3,
        "maxLength": 1024
      }
    },
    {
      "class": "com.digitalpebble.stormcrawler.filtering.depth.MaxDepthFilter",
      "name": "MaxDepthFilter",
      "params": {
        "maxDepth": 2
      }
    },
    {
      "class": "com.digitalpebble.stormcrawler.filtering.basic.BasicURLNormalizer",
      "name": "BasicURLNormalizer",
      "params": {
        "removeAnchorPart": true, 
        "unmangleQueryString": true,
        "checkValidURI": true,
        "removeHashes": false
      }
    },
    {
      "class": "com.digitalpebble.stormcrawler.filtering.host.HostURLFilter",
      "name": "HostURLFilter",
      "params": {
        "ignoreOutsideHost": true,
        "ignoreOutsideDomain": true
      }
    },
    {
      "class": "com.digitalpebble.stormcrawler.filtering.regex.RegexURLNormalizer",
      "name": "RegexURLNormalizer",
      "params": {
        "regexNormalizerFile": "default-regex-normalizers.xml"
      }
    },
    {
      "class": "com.digitalpebble.stormcrawler.filtering.regex.RegexURLFilter",
      "name": "RegexURLFilter",
      "params": {
        "regexFilterFile": "default-regex-filters.txt"
      }
    },
    {
      "class": "com.digitalpebble.stormcrawler.filtering.basic.SelfURLFilter",
      "name": "SelfURLFilter"
    },
    {
      "class": "com.digitalpebble.stormcrawler.filtering.metadata.MetadataFilter",
      "name": "MetadataFilter",
      "params": {
        "isSitemap": "false"
      }
    }
  ]
}

depedencies in the pom.xml

<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
   <groupId>commons-codec</groupId>
   <artifactId>commons-codec</artifactId>
   <version>1.9</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>1.0.2</version>
</dependency> -->
<dependency>
<groupId>com.digitalpebble.stormcrawler</groupId>
<artifactId>storm-crawler-core</artifactId>
<version>1.2</version>
</dependency>
</dependencies>

 I launch it with this command : mvn clean compile exec:java -Dexec.mainClass=com.zwoop.crawler.CrawlTopology -Dexec.args="-conf crawler-conf.yaml local"

DigitalPebble

unread,
Nov 11, 2016, 4:56:37 AM11/11/16
to DigitalPebble, michel....@gmail.com
Hi, 

Please see comments in orange below

On 11 November 2016 at 07:35, <michel....@gmail.com> wrote:

Hello,

First, a bit of context, I' quite new to the crawling world, Storm and StormCrawler.

I'm trying to crawl a list of domains in order to get the full html, CSS and JS content of all pages, exported into WARC files.
For that I'm trying both Nutch and StormCrawler.

I locally installed on windows 10 StormCrawler using the maven archetype. 

  -> I ran into this issue .So i had to remove the depency with flux-core from the pom.
 
As explained at the bottom of this issue, the problem is not so much with having flux-core as a dependency (it does the same with Tika) but is about executing SC via Maven. Install storm and use the storm command instead.
 

Managed to fetch some urls from a domain, using the standard StdOutStatusUpdater from the CrawlTopology.

Then I included the external WARC export library from github and modified the CrawlTopology.

   -> For that i had to add depedency to storm-hdfs and commons-codec.

Are you using the latest 1.2 release? The dependencies above should be inherited automatically when declaring : 

<dependency>
<groupId>com.digitalpebble.stormcrawler</groupId>
<artifactId>storm-crawler-warc</artifactId>
<version>1.2</version>
</dependency>
 
  -> Then got an Hadoop connection refused exception, linked to these action : 

        String fsURL = "hdfs://localhost:9000";
        warcbolt.withFsUrl(fsURL);


Did you define a path? e.g.

 String warcFilePath = "/warc";

        FileNameFormat fileNameFormat = new WARCFileNameFormat()
                .withPath(warcFilePath);

Does it exist and have the proper access rights for the account your are running on?


I locally installed and run hadoop hdfs (which was quite an hassle on windows.. l.) even though I'm not quite sure it was necessary, can you comment on that ?

If you have a single machine and are not bothered about replication then just use the local filesystem (as you did below). This will save you the hassle of installing Hadoop on Windows. 
 

Anyway, even if it removed some exception i also add to comment the above lines to let the standard confirguration to be able to connect to localhost.

nitpicking -> the local FS. you could have HDFS running in pseudo distributed mode i.e. connecting on localhost. What you are doing here is different - you are bypassing HDFS altogether 
 

Now, I'm still not able to get any of the content, i can see in the standard output that it's discovering and fetching some urls from the host and it generates an empty warc.gz file.

Do you have any idea what steps I missed or did wrong ?
Any help would be very appreciated :)

Can you share your topology? 


Anything in the log that can give you a hint? Do you use an IDE? You should be able to debug the topo e.g. with Eclipse, this would give you an idea of what went wrong?


 
HTH

Julien

--
You received this message because you are subscribed to the Google Groups "DigitalPebble" group.
To unsubscribe from this group and stop receiving emails from it, send an email to digitalpebble+unsubscribe@googlegroups.com.
To post to this group, send email to digita...@googlegroups.com.
Visit this group at https://groups.google.com/group/digitalpebble.
For more options, visit https://groups.google.com/d/optout.



--

michel....@gmail.com

unread,
Nov 11, 2016, 5:42:54 AM11/11/16
to DigitalPebble, michel....@gmail.com, jul...@digitalpebble.com
Thanks Julien for your fast answer.
As explained at the bottom of this issue, the problem is not so much with having flux-core as a dependency (it does the same with Tika) but is about executing SC via Maven. Install storm and use the storm command instead.
OK.

Are you using the latest 1.2 release? The dependencies above should be inherited automatically when declaring :

Actually i imported the source of the stormcrawler.warc SDK to be able to have a look at how it was working, hence the missing dependency.

Did you define a path? e.g.

 String warcFilePath = "/warc";

        FileNameFormat fileNameFormat = new WARCFileNameFormat()
                .withPath(warcFilePath);

Yes, it was already in the code example from git :).

Does it exist and have the proper access rights for the account your are running on?
Yes, the folder exist, and the process managed to create files on it (which just don't have any data in it)

nitpicking -> the local FS. you could have HDFS running in pseudo distributed mode i.e. connecting on localhost. What you are doing here is different - you are bypassing HDFS altogether 
Ok. I never used neither storm or Hadoop before so i was a bit confused on what was going on... Thanks.

Can you share your topology? 

Here it is:

/**
 * Dummy topology to play with the spouts and bolts
 */
public class CrawlTopology extends ConfigurableTopology {

    public static void main(String[] args) throws Exception {
        ConfigurableTopology.start(new CrawlTopology(), args);
    }

    @Override
    protected int run(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        String[] testURLs = new String[] {"http://www.lequipe.fr/" /*,
                "http://www.lemonde.fr/", "http://www.bbc.co.uk/",
                "http://storm.apache.org/", "http://digitalpebble.com/" */};

        builder.setSpout("spout", new MemorySpout(testURLs));

        builder.setBolt("partitioner", new URLPartitionerBolt())
                .shuffleGrouping("spout");

        builder.setBolt("fetch", new FetcherBolt()).fieldsGrouping(
                "partitioner", new Fields("key"));

        builder.setBolt("sitemap", new SiteMapParserBolt())
                .localOrShuffleGrouping("fetch");

        builder.setBolt("parse", new JSoupParserBolt()).localOrShuffleGrouping(
                "sitemap");

        builder.setBolt("index", new StdOutIndexer()).localOrShuffleGrouping(
                "parse");

        Fields furl = new Fields("url");
        
        // can also use MemoryStatusUpdater for simple recursive crawls
        builder.setBolt("status", new StdOutStatusUpdater())
                .fieldsGrouping("fetch", Constants.StatusStreamName, furl)
                .fieldsGrouping("sitemap", Constants.StatusStreamName, furl)
                .fieldsGrouping("parse", Constants.StatusStreamName, furl)
                .fieldsGrouping("index", Constants.StatusStreamName, furl);
        
        //generating warc files
        String warcFilePath = "/warc";

        FileNameFormat fileNameFormat = new WARCFileNameFormat()
                .withPath(warcFilePath);

        Map<String,String> fields = new HashMap<>();
        fields.put("software:", "StormCrawler 1.0 http://stormcrawler.net/");
        fields.put("conformsTo:", "http://www.archive.org/documents/WarcFileFormat-1.0.html");

        WARCHdfsBolt warcbolt = (WARCHdfsBolt) new WARCHdfsBolt()
                .withFileNameFormat(fileNameFormat);
        warcbolt.withHeader(fields);

        // can specify the filesystem - will use the local FS by default
        String fsURL = "hdfs://localhost:9000";
        //warcbolt.withFsUrl(fsURL);

        // a custom max length can be specified - 1 GB will be used as a default
        FileSizeRotationPolicy rotpol = new FileSizeRotationPolicy(5.0f,
                Units.MB);
        warcbolt.withRotationPolicy(rotpol);

        builder.setBolt("warc", warcbolt).localOrShuffleGrouping("fetch");
    
        return submit("crawl", conf, builder);
    }
}
Anything in the log that can give you a hint? Do you use an IDE? You should be able to debug the topo e.g. with Eclipse, this would give you an idea of what went wrong?
Didn't see anything special in the logs, and yes I started to run it in eclipse to debug it, but I'm running into another issue now...
950  [main] INFO  o.a.s.u.TupleUtils - Enabling tick tuple with interval [15]
1145 [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -7001763315176827023:-8145610104524275269
1227 [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []
17298 [main] WARN  o.a.s.u.NimbusClient - Ignoring exception while trying to get leader nimbus info from localhost. will retry with a different seed host.
java.lang.RuntimeException: org.apache.storm.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect
at org.apache.storm.security.auth.TBackoffConnect.retryNext(TBackoffConnect.java:64) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:56) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:99) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.security.auth.ThriftClient.<init>(ThriftClient.java:69) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.utils.NimbusClient.<init>(NimbusClient.java:106) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:66) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.StormSubmitter.topologyNameExists(StormSubmitter.java:371) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:233) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:311) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:157) [storm-core-1.0.2.jar:1.0.2]
at com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:85) [storm-crawler-core-1.2.jar:?]
at com.zwoop.crawler.CrawlTopology.run(CrawlTopology.java:107) [classes/:?]
at com.digitalpebble.stormcrawler.ConfigurableTopology.start(ConfigurableTopology.java:50) [storm-crawler-core-1.2.jar:?]
at com.zwoop.crawler.CrawlTopology.main(CrawlTopology.java:45) [classes/:?]
Caused by: org.apache.storm.thrift.transport.TTransportException: java.net.ConnectException: Connection refused: connect
at org.apache.storm.thrift.transport.TSocket.open(TSocket.java:226) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:103) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-core-1.0.2.jar:1.0.2]
... 12 more
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method) ~[?:1.8.0_111]
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source) ~[?:1.8.0_111]
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source) ~[?:1.8.0_111]
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source) ~[?:1.8.0_111]
at java.net.AbstractPlainSocketImpl.connect(Unknown Source) ~[?:1.8.0_111]
at java.net.PlainSocketImpl.connect(Unknown Source) ~[?:1.8.0_111]
at java.net.SocksSocketImpl.connect(Unknown Source) ~[?:1.8.0_111]
at java.net.Socket.connect(Unknown Source) ~[?:1.8.0_111]
at org.apache.storm.thrift.transport.TSocket.open(TSocket.java:221) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:103) ~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-core-1.0.2.jar:1.0.2]
... 12 more
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:90)
at org.apache.storm.StormSubmitter.topologyNameExists(StormSubmitter.java:371)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:233)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:311)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:157)
at com.digitalpebble.stormcrawler.ConfigurableTopology.submit(ConfigurableTopology.java:85)
at com.zwoop.crawler.CrawlTopology.run(CrawlTopology.java:107)
at com.digitalpebble.stormcrawler.ConfigurableTopology.start(ConfigurableTopology.java:50)
at com.zwoop.crawler.CrawlTopology.main(CrawlTopology.java:45)

As soon as i fix that i'll try to debug more the process, thanks !

Anthony

Hi, 

To unsubscribe from this group and stop receiving emails from it, send an email to digitalpebbl...@googlegroups.com.

To post to this group, send email to digita...@googlegroups.com.
Visit this group at https://groups.google.com/group/digitalpebble.
For more options, visit https://groups.google.com/d/optout.

DigitalPebble

unread,
Nov 11, 2016, 6:00:41 AM11/11/16
to DigitalPebble
 I launch it with this command : mvn clean compile exec:java -Dexec.mainClass=com.zwoop.crawler.CrawlTopology -Dexec.args="-conf crawler-conf.yaml local"

should be -local, it will try to connect to Storm otherwise and if you haven't installed it you'll get the error message you just posted
 

To unsubscribe from this group and stop receiving emails from it, send an email to digitalpebble+unsubscribe@googlegroups.com.

To post to this group, send email to digita...@googlegroups.com.
Visit this group at https://groups.google.com/group/digitalpebble.
For more options, visit https://groups.google.com/d/optout.

michel....@gmail.com

unread,
Nov 11, 2016, 6:11:17 AM11/11/16
to DigitalPebble, jul...@digitalpebble.com
Yep, typical friday evening mistake :)

I'm trying to debug it, and i will let you know, thanks for your help

Anthony MICHEL

unread,
Nov 14, 2016, 6:09:49 AM11/14/16
to digita...@googlegroups.com, jul...@digitalpebble.com
Small Feedback on the previous points.

The issue was that I was thinking that once the discovering and parsing is finished the process will output all the content from memory to the file.

And for testing purpose i was using a max depth, and a rotation after 100 mo.

But i was never reaching the 100mo of content to generate the file, and it turns out that when the discovering is finished the process doesn't flush the content.

Now, i generate files with a rotation of 1mo and i parse a whole website, so i got most of the results (I also save the indexs in a SQL database and it works perfect !)

But then i miss the last part of content which is not fetch, so i need to find a way to detect the fetching is finished and flush the data from memory to the file.


You received this message because you are subscribed to a topic in the Google Groups "DigitalPebble" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/digitalpebble/vC9cqBlqmxo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to digitalpebble+unsubscribe@googlegroups.com.

DigitalPebble

unread,
Nov 14, 2016, 11:59:31 AM11/14/16
to DigitalPebble, Anthony MICHEL
Hi

But then i miss the last part of content which is not fetch, so i need to find a way to detect the fetching is finished and flush the data from memory to the file.

Did you mean "last part of the content which is not flushed"? As for detecting that the fetching is finished, this is not doable out of the box with Storm, you'll have to implement some heuristic to detect it and kill the topology. Even if you kill the topology there is no guarantee that the WARC file will be closed properly, see https://github.com/commoncrawl/news-crawl/issues/10

You could change the sync policy so that the tuples are flushed more often, not sure whether this would result in a well-formed archive though but worth trying. 

Alternatively, you could use https://github.com/commoncrawl/news-crawl/blob/master/src/main/java/com/digitalpebble/stormcrawler/FileTimeSizeRotationPolicy.java so that the rotation happens not only based on size but also time.

HTH

Julien




 

To unsubscribe from this group and stop receiving emails from it, send an email to digitalpebble+unsubscribe@googlegroups.com.

To post to this group, send email to digita...@googlegroups.com.
Visit this group at https://groups.google.com/group/digitalpebble.
For more options, visit https://groups.google.com/d/optout.

DigitalPebble

unread,
Nov 16, 2016, 1:08:31 PM11/16/16
to Anthony MICHEL, DigitalPebble
Hi Anthony

Please find my comments below + reposting to the list so that others can benefit from the discussion

On 15 November 2016 at 09:24, Anthony MICHEL <michel....@gmail.com> wrote:
Ok Sorry I'll try to be follow Storm terminology. (I'm still running in local mode so far, will switch to storm very soon)

I would like to be able to monitor the tuples waiting to be processed and see the ones already processed.

That depends from how you store the URLs. With a distribtued queue like RabbitMQ you could simply peak at the queue and check its size. The MemorySpout is for demonstration purposes mainly, I would not recommend that you build your architecture on it
 
May be also know if a toplogy is still processing a tuple or if it's idle, waiting for new tuples to process.

Apart from the Storm UI which will give you a lot of info when you start using it, you shoudl also use the metrics to gets stats about the various components

 

Concretely what i would like to achieve is triggering some function (flushing all data in memory to the WARC file and compressing it for example) when the crawl of a whole webiste is finished (i.e staying on same host and domain, fetched all discovered URL).

Terminating the topology automatically when the crawl is finished would require some tricks but this would not give you any guarantee that the bolt is closed properly and the content flushed to WARC
 

I'm thinking on adding some kind on logic on the Spout for that but it's probably not the best design.... 

It would be simpler to modify the WARC so that it defines a tick-tuple e.g. 30 secs and add some logic so that if no tuples have been added to the WARC since the previous tick then close the file properly. The topology would still be running but at least the output would be fine. you could then turn the topo off manually or based on some mechanism

HTH

Julien

 

Regards,

Anthony

2016-11-15 16:44 GMT+08:00 DigitalPebble <jul...@digitalpebble.com>:

Do u think there is a way to programatically monitor in storm the ongoing and queueing jobs ?

what do you mean by jobs? topologies? 

On 15 November 2016 at 00:30, Anthony MICHEL <michel....@gmail.com> wrote:

Yes i meant flushed.

The first solution is what i tried first and it generates a corrupted compressed file.

Using some kind of timeout is a good idea i may try.

Do u think there is a way to programatically monitor in storm the ongoing and queueing jobs ?

anthony...@zwoop.biz

unread,
Dec 1, 2016, 10:37:35 PM12/1/16
to DigitalPebble, michel....@gmail.com, jul...@digitalpebble.com

Hi,

Just a little update,

Finally I changed a bit my design so now I'm actually generating a file per page crawled.

I did that very easily by implementing a FileRotationPolicy always marking the tuple.
Except if the offset if 0, in order to avoid generating empty files when no page to crawled are left in the spout.

import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;

/**
 * File rotation policy that will rotate for every tuple.
 * One file per tuple. i.e one file per page crawled.
 */
public class AlwaysRotationPolicy implements FileRotationPolicy {

@Override
public boolean mark(Tuple tuple, long offset) {
//for every non null tuple, create a file.
return offset > 0;
}

@Override
public void reset() {
}
}

And obviously setting it up in the toplogy.
        //the crawler is designed to store each crawled page separately
        AlwaysRotationPolicy rotationPolicy = new AlwaysRotationPolicy();
        
        warcbolt.withRotationPolicy(rotationPolicy);

Thanks for you help. After a bit of designed we decided to go further with StormCrawler.

So hopefully we'll have some things to contribute to it in the near future :).

One point, most private attributes of the framework doesn't have any getter methods.
It makes it a bit annoying for implementing inherited class of them. Especially for AbstractClass for example, like AbstractStatusUpdaterBolt.

Not being able to get the useCache, cache, scheduler... makes it hard to override the execute method without redefining everything.

Any reason for that ?

Regards,
Anthony

Regards,

Anthony

To unsubscribe from this group and all its topics, send an email to digitalpebbl...@googlegroups.com.

To post to this group, send email to digita...@googlegroups.com.
Visit this group at https://groups.google.com/group/digitalpebble.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "DigitalPebble" group.
To unsubscribe from this group and stop receiving emails from it, send an email to digitalpebbl...@googlegroups.com.
To post to this group, send email to digita...@googlegroups.com.
Visit this group at https://groups.google.com/group/digitalpebble.
For more options, visit https://groups.google.com/d/optout.



--



--

DigitalPebble

unread,
Dec 2, 2016, 6:44:38 AM12/2/16
to DigitalPebble, Anthony MICHEL
Hi Anthony

Thanks for sharing your solution to this problem and glad to hear that you will be using StormCrawler. Let me know if you want to be added to the list of users on the WIKI - the more it is seen as being used, the more prospective users are likely to give it a try.

Re-getters() : I never had to access the cache and schedulers from a subclass. The idea of the AbstractStatusUpdaterBolt is that this behaviour should be used regardless of the particular logic of a bolt. Feel free to open a PR so that we can discuss it further. I am looking forward to your contributions in the future!

Thanks

Julien

To unsubscribe from this group and stop receiving emails from it, send an email to digitalpebble+unsubscribe@googlegroups.com.

To post to this group, send email to digita...@googlegroups.com.
Visit this group at https://groups.google.com/group/digitalpebble.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages