Reliable word-count acks in local mode, fails on cluster.

450 views
Skip to first unread message

Dan Dillinger

unread,
Jan 13, 2012, 2:47:27 PM1/13/12
to storm...@googlegroups.com
I have modified the word-count example to be reliable. See here: https://gist.github.com/1608336

This code runs fine in local mode. In 10s it emits a bunch of tuples and acks about 50 of them, and fails none, by the time it is killed.

However, when I deploy this code to my real storm cluster, the spout never gets acks for anything, even though the bolts are acking. Ultimately, timeouts start to occur and then it fails everything.

I've tried changing TOPOLOGY-DEBUG to false, I've tried removing most of the debugging spits I had in the code, I tried commenting out the emit-bolt! call in the last bolt in the topology just in case what it was emitting, which had nothing listening, was what's hanging out blocking the acking. Nothing has made a difference.

How is acking communicated back to the spout in a prod cluster? Is there some configuration issue that this might be, or...?

Joseph Schliffer

unread,
Jan 13, 2012, 4:16:56 PM1/13/12
to storm...@googlegroups.com
I'm having a similar issue.  I am connecting to a AMQP queue and pulling messages off it in the spout, then using the bolt for nothing but acking.  Monitoring my Rabbit cluster I see messages being delivered at a very high rate (about 50,000/s) but being acked a very slow rates (1,000/s max).  I have the ack in the spout doing the call back to Rabbit's basicAck method.  Parallelism is set to 32, with 16 workers and 32 ackers.  

My spout is doing nothing but pulling a message off the queue in NextTuple and emitting it.  My bolt is doing nothing calling ack!:

(ack [id]
         (. @channel basicAck id false))

(defbolt ack-message []
  [tuple collector]
  (ack! collector tuple))


If I change the call to basicAck to be inside NextTuple itself (right before it emits), the ack rate in Rabbit jumps to about equal to the delivery rate.  So something is delaying the call to ack on the spout, but I have no idea what.  Debugging is turned off, there are plenty of acker threads and I'm not timing out messages until after 90 seconds.  

If I run local (with p 12, 1 worker and 1 acker), the delivery rate and are about the same, maybe 20,000/s, so overall running locally on my workstation with 12 threads is actually a LOT faster than running on the production cluster.   

Is there some configuration setting that we're both missing that is causing the ack to work much better locally?  Here is my topology:

(defn mk-topology []
  (topology
   {"1" (spout-spec rabbit-consumer-spout :p 32)}
   {"2" (bolt-spec {"1" :shuffle}
                   ack-message :p 32)}))

The setup to the local call:

(defn run-local! []
  (with-local-cluster [cluster]
    (submit-local-topology (:nimbus cluster)
      "rabbit-spout"
      {TOPOLOGY-WORKERS 1
       TOPOLOGY-DEBUG false 
       TOPOLOGY-ACKERS 1}
      (mk-topology))
    (Thread/sleep (* 1 60000))))

Main

(defn -main [name]
  (StormSubmitter/submitTopology
   name
   {TOPOLOGY-WORKERS 16 
    TOPOLOGY-DEBUG false 
    TOPOLOGY-ACKERS 32 
    TOPOLOGY-MESSAGE-TIMEOUT-SECS 90}
   (mk-topology)))


I can give a lot more information/code/data if needed.  

Nathan Marz

unread,
Jan 13, 2012, 4:41:49 PM1/13/12
to storm...@googlegroups.com
I just ran your code on a 0.6.1 cluster and it worked fine, although I took out the "spit" calls and replaced them with backtype.storm.log/log-message calls. Here's the exact code I ran: http://pastie.org/3180668

What version of Storm are you using?
--
Twitter: @nathanmarz
http://nathanmarz.com

Nathan Marz

unread,
Jan 13, 2012, 4:45:42 PM1/13/12
to storm...@googlegroups.com
Can you send over screenshots of the stats from the Storm UI for the topology and components? That would help with understanding exactly what's happening here.


Joseph Schliffer

unread,
Jan 13, 2012, 4:56:46 PM1/13/12
to storm...@googlegroups.com






Using 0.6.1 locally and on the cluster.  When I took the topology screenshot, it had emitted 400k but only acked 1400 of them.  This is pretty much what I can observe while watching Rabbit as well.  


Dan Dillinger

unread,
Jan 13, 2012, 9:42:09 PM1/13/12
to storm...@googlegroups.com
Using storm 0.6.1.

I ran your version of the reliable wordcount code on the cluster and took screenshots. It's just failing every tuple. 

I can only guess that it's timing out somehow...?
topology.png
spout.png
acker.png
bolt3.png
bolt4.png

Nathan Marz

unread,
Jan 14, 2012, 4:15:38 AM1/14/12
to storm...@googlegroups.com
This is really strange. The acker is receiving tuples from all the streams its supposed to be receiving on but not emitting any tuples. 

Can you try modifying the spout so it only emits one tuple and then does nothing for future nextTuple calls? Then turn TOPOLOGY_DEBUG on and send me all the worker logs. I want to see exactly what's going on with the tuple tree.

Also, can you show me all the dependencies for this project? Are you just running from storm-starter? I want to rule out any dependency problems causing weird behavior.

Nathan Marz

unread,
Jan 14, 2012, 4:22:46 AM1/14/12
to storm...@googlegroups.com
As far as I can tell, your network seems to be completely overwhelmed. One thing you can try is throttling the spout with the TOPOLOGY-MAX-SPOUT-PENDING config to see what's the max rate of processing you can actually sustain on your cluster.

It's also probably worth getting some stats of what's going on with the network. I recommend Ganglia for this. 

Dan Dillinger

unread,
Jan 14, 2012, 2:03:41 PM1/14/12
to storm...@googlegroups.com
I ran it as you asked and send along the logs. As for the dependencies here is the project.clj definition I have:

(defproject topologies "1.0.0-SNAPSHOT"
  :description "FIXME: write description"
  :aot :all
  :dependencies [[com.rabbitmq/amqp-client "2.7.1"]]
  :dev-dependencies [[storm "0.6.1"]
                     [org.clojure/clojure "1.2.0"]
                     [org.clojure/clojure-contrib "1.2.0"]])

Nathan Marz

unread,
Jan 17, 2012, 3:44:29 AM1/17/12
to storm...@googlegroups.com
Sorry for the late response. So it looks like three of the tuples emitted by split-sentence never made it to word-count. The only tuple that made it was to the task on the same machine. Could it be possible that there's a  misconfiguration in your network preventing delivery between some pairs of machines? Are all the necessary ports open on all the worker nodes?

Dan Dillinger

unread,
Jan 17, 2012, 11:05:44 AM1/17/12
to storm...@googlegroups.com
No need to be sorry, it was the weekend -- and a long weekend for many people at that!

Based on your suggestion I went looking closer at netstat on the nodes, and discovered that although the workers were communicating normally, all connections to nimbus and zookeeper were solely appearing in ipv6 style, with the :ffff:: prefix and so forth.

On a hunch, based on needing to add this option elsewhere, I added -Djava.net.preferIPv4Stack=true to the supervisor options, knowing that ipv6 support in some corners (for example, zeromq 2.1.7 doesn't have it, the ipv6 patch was added in august according to the zeromq-dev list).

Restarting the supervisors with that change is allowing acks to return now. I ran wordcount again, without limiting the number of tuples the spout will emit, and it began acking normally.


Reply all
Reply to author
Forward
0 new messages