throughput reduced abruptly

712 views
Skip to first unread message

Min Zhou

unread,
Dec 30, 2011, 3:54:15 AM12/30/11
to storm...@googlegroups.com
Hi all, 

I've written a spout for testing the throughput of our storm cluster.  After submitting the topology, I've seen a high throughput in the first few minutes. And then whole topology doesn't process anything more for few seconds. After that, I saw some faileds on the web ui because of timeout. 

below it's the code of my spout. 

import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;

public class TestWordSpout implements IRichSpout {
  public static Logger LOG = Logger.getLogger(TestWordSpout.class);
  boolean _isDistributed;
  SpoutOutputCollector _collector;
  
  private transient LinkedBlockingQueue<MessageWrapper> messageQueue;
  private transient ConcurrentHashMap<Long, MessageWrapper> id2wrapperMap;
  
  public static final class MessageWrapper {
    final byte[] message;
    final CountDownLatch latch;
    volatile boolean success = false;
    final long messageId;

    public MessageWrapper(final byte[] message, final long messageId) {
      this.message = message;
      this.messageId = messageId;
      this.latch = new CountDownLatch(1);
    }

  }
  

  public TestWordSpout() {
    this(true);
  }

  public TestWordSpout(boolean isDistributed) {
    _isDistributed = isDistributed;
  }

  public boolean isDistributed() {
    return _isDistributed;
  }

  public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
    _collector = collector;
    id2wrapperMap = new ConcurrentHashMap<Long, MessageWrapper>();
    messageQueue = new LinkedBlockingQueue<MessageWrapper>();
    new Thread() {

      private byte[] word = new byte[1024];

      public void run() {
        while (true) {
          final long messageId = MessageId.generateId();
          final MessageWrapper wrapper = new MessageWrapper(word, messageId);
          id2wrapperMap.put(messageId, wrapper);
          messageQueue.offer(wrapper);
          try {
            wrapper.latch.await();
          } catch (final InterruptedException ie) {
            LOG.error("InterruptedException while consuming message ", ie);
            Thread.currentThread().interrupt();
          }
          if (!wrapper.success) {
            LOG.warn("Error while consuming message " + messageId);
          }
        }

      }
    }.start();

  }

  public void close() {

  }

  public void nextTuple() {
    final MessageWrapper wrapper = messageQueue.poll();
    if (wrapper == null)
      return;
    final byte[] message = wrapper.message;
    _collector.emit(new Values(message), wrapper.messageId);
  }

  public void ack(Object msgId) {
    if (msgId instanceof Long) {
      final long id = (Long) msgId;
      final MessageWrapper wrapper = id2wrapperMap.remove(id);
      if (wrapper == null) {
        LOG.warn(String.format("don't know how to ack(%s: %s)", msgId
            .getClass().getName(), msgId));
        return;
      }
      wrapper.success = true;
      wrapper.latch.countDown();
    } else {
      LOG.warn(String.format("don't know how to ack(%s: %s)", msgId
          .getClass().getName(), msgId));
    }
  }

  public void fail(Object msgId) {
    if (msgId instanceof Long) {
      final long id = (Long) msgId;
      final MessageWrapper wrapper = id2wrapperMap.remove(id);
      if (wrapper == null) {
        LOG.warn(String.format("don't know how to reject(%s: %s)", msgId
            .getClass().getName(), msgId));
        return;
      }
      wrapper.success = false;
      wrapper.latch.countDown();
    } else {
      LOG.warn(String.format("don't know how to reject(%s: %s)", msgId
          .getClass().getName(), msgId));
    }
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }
}


The bolt used for testing is very simple, just for acking messages.

public class PrinterBolt implements IRichBolt {

  private OutputCollector collector;
  
  @Override
  public void prepare(Map stormConf, TopologyContext context,
      OutputCollector collector) {
    this.collector = collector;
  }

  @Override
  public void execute(Tuple input) {
    collector.ack(input);
  }

  @Override
  public void cleanup() {

  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new backtype.storm.tuple.Fields("abc"));
  }

}


Anyone could try the code and send the result there for me?

Could anyone more kindly explain it?


Thanks,
Min
--
My research interests are distributed systems, parallel computing and bytecode based virtual machine.

My profile:
http://www.linkedin.com/in/coderplay
My blog:
http://coderplay.javaeye.com

songhe yang

unread,
Dec 30, 2011, 5:06:53 AM12/30/11
to storm-user
I run it, the following is a snapshot from storm UI (1 nimbus, 2
supervisor), it still running now, didn't get error.

Topology summaryName Id Status Uptime Num workers Num tasksperftest
perftest-2-1325239321 ACTIVE 3m 21s 1 3Topology statsWindow Emitted
Transferred Complete latency (ms) Acked Failed10m 0s 2293500 2293500
0.285 573320 03h 0m 0s 2293500 2293500 0.285 573320 01d 0h 0m 0s
2293500 2293500 0.285 573320 0All time 2293500 2293500 0.285 573320
0Spouts (All time)Id Parallelism Emitted Transferred Complete latency
(ms) Acked Failed Last error1 1 1146600 1146600 0.285 573320 0 Bolts
(All time)Id Parallelism Emitted Transferred Process latency (ms)
Acked Failed Last error-1 1 573600 573600 0.018 1147200 0 2 1 573300
573300 0.025 573300 0

Haitao Yao

unread,
Dec 30, 2011, 5:10:50 AM12/30/11
to storm...@googlegroups.com
how many workers do you start?

songhe yang

unread,
Dec 30, 2011, 5:12:50 AM12/30/11
to storm-user
Use the default (1 worker)

songhe yang

unread,
Dec 30, 2011, 5:21:45 AM12/30/11
to storm...@googlegroups.com
Storm UI png
perftest.png

songhe yang

unread,
Dec 30, 2011, 5:45:14 AM12/30/11
to storm-user
Some questions about the Storm UI:

1) Why there is a bolt Id -1 under Bolts, what's means?

2) The Bolt "PrinterBolt" didn't emit any tuples, why bolt Id 2 still
has Emitted value ?

3) Why the Emitted value(=1985940) of spout Id 1 much larger then the
Emitted value(=992980) of bolt Id 2 ? The speed of bolt 2 didn't
catch up the emit speed of spout?

thanks
>  perftest.png
> 78KViewDownload

Nathan Marz

unread,
Dec 30, 2011, 6:37:08 AM12/30/11
to storm...@googlegroups.com
On Fri, Dec 30, 2011 at 2:45 AM, songhe yang <songh...@gmail.com> wrote:
Some questions about the Storm UI:

1) Why there is a bolt Id -1 under Bolts, what's means?


That's the "acker bolt" which takes care of tracking tuple trees. It's an integral part of guaranteeing message processing, and is described in the wiki here: https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

In 0.6.0 that bolt was renamed to "__acker"
 
2) The Bolt "PrinterBolt" didn't emit any tuples, why bolt Id 2 still
has Emitted value ?

It's emitting ack tuples on its "ack" stream that go to the acker bolt.
 

3) Why the Emitted value(=1985940) of spout Id 1 much larger then the
Emitted value(=992980) of bolt Id 2 ?  The speed of bolt 2 didn't
catch up the emit speed of spout?

Notice that the emitted value of spout 1 is just about double the emitted value of bolt 2.

This is because for every tuple spout 1 emits, it sends a tuple to bolt 2 as well as a tuple to the acker bolt (to track the tuple tree). Bolt 2 only acks tuples so it sends a single ack tuple per received tuple.



--
Twitter: @nathanmarz
http://nathanmarz.com

Nathan Marz

unread,
Dec 30, 2011, 6:38:49 AM12/30/11
to storm...@googlegroups.com
Hi Min,

What version of Storm are you using? Can you send a screenshot of the Storm UI once the topology starts having issues?

-Nathan

Min Zhou

unread,
Dec 30, 2011, 9:37:59 AM12/30/11
to storm...@googlegroups.com

Hi Nathan,

Here is a snapshot of the web ui . Before 1m 30s, it works fine with 1084980 messages acked. Topology get stuck at 1m 30s.

Storm UI

Topology summary

Name
Id Status Uptime Num workers Num tasks
throughputtest throughputtest-163-1325255356 ACTIVE 2m 41s 19 19

Topology stats

Window
Emitted Transferred Complete latency (ms) Acked Failed
10m 0s 4339880 4339880 0.610 1084980 0
3h 0m 0s 4339880 4339880 0.610 1084980 0
1d 0h 0m 0s 4339880 4339880 0.610 1084980 0
All time 4339880 4339880 0.610 1084980 0

Spouts (All time)

Id
Parallelism Emitted Transferred Complete latency (ms) Acked Failed Last error
word 9 2169960 2169960 0.610 1084980 0

Bolts (All time)

Id
Parallelism Emitted Transferred Process latency (ms) Acked Failed Last error
__acker 2 1084960 1084960 0.015 2169940 0
print 8 1084960 1084960 0.019 1084960 0



Few seconds later, failures appeared.


Storm UI

Component summary

Id Topology Parallelism
word throughputtest 9

Spout stats

Window
Emitted Transferred Complete latency (ms) Acked Failed
10m 0s 2170100 2170100 0.610 1084980 20
3h 0m 0s 2170100 2170100 0.610 1084980 20
1d 0h 0m 0s 2170100 2170100 0.610 1084980 20
All time 2170100 2170100 0.610 1084980 20

Output stats (All time)

Stream
Emitted Transferred Complete latency (ms) Acked Failed
__ack_init 1077280 1077280 0 0 0
default 1092820 1092820 0.610 1084980 20

Tasks (All time)

Id Uptime Host Port
Emitted Transferred Complete latency (ms) Acked Failed Last error
11 7m 37s dw40.kgb.sqa.cm4 6700 244700 244700 0.597 122340 20
12 7m 37s dw43.kgb.sqa.cm4 6700 244380 244380 0.584 122180 0
13 7m 37s dw44.kgb.sqa.cm4 6700 242000 242000 0.589 120980 0
14 7m 37s dw47.kgb.sqa.cm4 6700 249100 249100 0.626 124540 0
15 7m 37s dw50.kgb.sqa.cm4 6700 217100 217100 0.666 108540 0
16 7m 38s dw46.kgb.sqa.cm4 6700 244800 244800 0.612 122380 0
17 7m 37s dw34.kgb.sqa.cm4 6700 244220 244220 0.609 122120 0
18 7m 37s dw37.kgb.sqa.cm4 6700 236920 236920 0.610 118460 0
19 7m 38s dw29.kgb.sqa.cm4 6700 246880 246880 0.601 123440 0


Thanks,
Min

Min Zhou

unread,
Dec 30, 2011, 9:45:09 AM12/30/11
to storm...@googlegroups.com

Min Zhou

unread,
Dec 30, 2011, 9:46:02 AM12/30/11
to storm...@googlegroups.com
storm: 0.6.0
kernel: 2.6.18 REL5u4
zeromq: 2.1.10
jdk:1.6.0_23
jvm: hotspot 64-bit server, build 19.0-b09, mixed mode


Few minutes later, more failures.

Storm UI

Topology summary

Name Id Status Uptime Num workers Num tasks
throughputtest throughputtest-163-1325255356 ACTIVE 9m 47s 19 19

Topology stats

Window Emitted Transferred Complete latency (ms) Acked Failed
10m 0s 3927660 3927660 0.578 981900 60
3h 0m 0s 4340200 4340200 0.610 1085000 60
1d 0h 0m 0s 4340200 4340200 0.610 1085000 60
All time 4340200 4340200 0.610 1085000 60

Spouts (All time)

Id Parallelism Emitted Transferred Complete latency (ms) Acked Failed Last error
word 9 2170140 2170140 0.610 1085000 60

Bolts (All time)

Id Parallelism Emitted Transferred Process latency (ms) Acked Failed Last error
__acker 2 1085000 1085000 0.015 2170020 0
print 8 1085060 1085060 0.019 1085100 0


On Fri, Dec 30, 2011 at 7:38 PM, Nathan Marz <natha...@gmail.com> wrote:

Min Zhou

unread,
Dec 30, 2011, 9:01:19 PM12/30/11
to storm...@googlegroups.com
Hi songge,

Thanks for the results.  Could you pls test it with more parallelism and more workers?

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("word", new TestWordSpout(), 9);
    builder.setBolt("print", new PrinterBolt(), 8).shuffleGrouping("word");

    Config conf = new Config();
    conf.setMessageTimeoutSecs(60);
    conf.setMaxSpoutPending(1024);
    conf.setNumAckers(2);
    conf.setNumWorkers(19);

   StormSubmitter.submitTopology("throughputtest", conf, builder.createTopology());

I test it with above configurations.




Thanks,
Min

songhe yang

unread,
Dec 30, 2011, 10:20:48 PM12/30/11
to storm-user
Have you change "supervisor.slots.ports" in storm.yaml?

Could you show me the storm.yaml?

Min Zhou

unread,
Dec 30, 2011, 10:23:06 PM12/30/11
to storm...@googlegroups.com

storm.zookeeper.servers:
  - "dw94.kgb.sqa.cm4"
storm.local.dir: "/disk1/storm"
java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib:/usr/lib64:/lib64:/usr/local/lib64"
nimbus.host: "dw26.kgb.sqa.cm4"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

songhe yang

unread,
Dec 30, 2011, 10:30:22 PM12/30/11
to storm-user
How many supervisors?

e.g: If you have two supervisors, then the total number of workers
will be 2 * 4 = 8

Even though you want to use 19 workers to run the topology, it still
only can use 8 works.

Min Zhou

unread,
Dec 30, 2011, 10:37:50 PM12/30/11
to storm...@googlegroups.com
The total cluster has 25 physical machine, one is for nimbus, the others are for supervisor. 
Each slave node starts only one supervisor. The slots of workers should depends on the cores of machine.

songhe yang

unread,
Dec 30, 2011, 11:05:20 PM12/30/11
to storm...@googlegroups.com
I only have two physical machines , check the result, it is still running now.
storm11.png

songhe yang

unread,
Dec 30, 2011, 11:07:35 PM12/30/11
to storm-user
One more thing, my storm version is "0.5.5-SNAPSHOT".
> ...
>
> read more »

Min Zhou

unread,
Dec 31, 2011, 3:46:32 AM12/31/11
to storm...@googlegroups.com
Finally, the prime culprit is zeromq.
I downgrade 0mq from 2.1.10 to 2.1.7, the problem hasn't appear.


Nathan Marz

unread,
Dec 31, 2011, 6:27:52 AM12/31/11
to storm...@googlegroups.com
OK, good to know. I've updated the wiki to include this information.

songhe yang

unread,
Jan 1, 2012, 11:36:14 PM1/1/12
to storm-user
I also use 0mq 2.1.10, what's the issue of 0mq 2.1.10?
> ...
>
> read more »

Nathan Marz

unread,
Jan 2, 2012, 2:05:30 AM1/2/12
to storm...@googlegroups.com
There were numerous issues reported with 2.1.10:

1. Messages stop sending between components
2. Random exceptions

2.1.7 seems to be stable.

Nathan Marz

unread,
Oct 22, 2012, 10:21:28 PM10/22/12
to storm...@googlegroups.com
The plan is to replace ZeroMQ with a pure-Java solution, possibly custom built.

On Mon, Oct 22, 2012 at 7:07 PM, Chase <chase.w...@gmail.com> wrote:
Hello - Saw this thread and I have a system where the sysadmin installed 2.2 of Zmq and have seen the same types delays that are described (the sysadmin downloads the latest to install since it is marked stable) - I am going to downgrade to 2.1.7 and retest.  However, a question:  It seems slightly unusable to stay on a version of 0mq that cannot get "patched" since the upstream is not truly stable.  Are there plans to go to the 3.x branch?  or switch to the fork of zeromq (the crossroads.io project).  This post is more to get information on direction of zeromq vs crossroads.io 

Chase

unread,
Oct 22, 2012, 10:48:09 PM10/22/12
to storm...@googlegroups.com
Great - a pure java solution would definitely be useful.  Also noticed you changed the HWM to 0 by default - is that just due to flakiness of 0mq?

Nathan Marz

unread,
Oct 22, 2012, 10:50:14 PM10/22/12
to storm...@googlegroups.com
We saw some strange behavior, although I'm not entirely sure it was due to the HWM or not. I'm going to keep it at 0 until I'm sure it's stable enough for a release.
Reply all
Reply to author
Forward
0 new messages