Development release for Storm 0.8.0 available

3,245 views
Skip to first unread message

nathanmarz

unread,
Jun 6, 2012, 4:00:34 PM6/6/12
to storm-user
It's less than a week since Storm 0.7.2 was released, but I'm happy to
announce that a development release for Storm 0.8.0 is now available
from the downloads page: https://github.com/nathanmarz/storm/downloads

Storm 0.8.0 is the most significant release of Storm yet, and a major
step forward in the evolution of the project. Here's a summary of the
changes:

1. Executors: Prior to Storm 0.8.0, a running topology consisted of
some number of workers and some number of tasks that ran on those
workers. In the old model, worker = process and task = thread. Storm
0.8.0 changes this model by introducing executors. In this model, a
worker = process, an executor = thread, and one executor runs many
tasks from the same spout/bolt.

The reason for the change is that the old model complected the
semantics of the topology with its physical execution. For example, if
you had a bolt with 4 tasks doing a fields grouping on some stream, in
order to maintain the semantics of the fields grouping (that the same
value always goes to the same task id for that bolt), the number of
tasks for that bolt needs to be fixed for the lifetime of the
topology, and since task = thread, the number of threads for that bolt
is fixed for the lifetime of the topology. In the new model, the
number of threads for a bolt is disassociated from the number of
tasks, meaning you can change the number of threads for a spout/bolt
dynamically without affecting semantics. Similarly, if you're keeping
large amounts of state in your bolts, and you want to increase the
parallelism of the bolt without having to repartition the state, you
can do this with the new executors.

At the API level, the "parallelism_hint" now specifies the initial
number of executors for that bolt. You can specify the number of tasks
using the TOPOLOGY_TASKS component config. For example:

builder.setBolt(new MyBolt(),
3).setNumTasks(128).shuffleGrouping("spout");

This sets the initial number of executors to 3 and the number of tasks
to 128.

If you don't specify the number of tasks for a component, it will be
fixed to the initial number of executors for the lifetime of the
topology.

Finally, you can change the number of workers and/or number of
executors for components using the "storm rebalance" command. The
following command changes the number of workers for the "demo"
topology to 3, the number of executors for the "myspout" component to
5, and the number of executors for the "mybolt" component to 1:

storm rebalance demo -n 3 -e myspout=5 -e mybolt=1

2. Pluggable scheduler: You can now implement your own scheduler to
replace the default scheduler to assign executors to workers. You
configure the class to use using the "storm.scheduler" config in your
storm.yaml, and your scheduler must implement this interface:
https://github.com/nathanmarz/storm/blob/master/src/jvm/backtype/storm/scheduler/IScheduler.java

3. Throughput improvements: The internals of Storm have been
rearchitected for extremely significant performance gains. I'm seeing
throughput increases of anywhere from 5-10x of what it was before. The
key changes made were:

a) Replacing all the internal in-memory queuing with the LMAX
Disruptor
b) Doing intelligent auto-batching of processing so that the consumers
can keep up with the producers

Here are the configs which affect how buffering/batching is done:

topology.executor.receive.buffer.size
topology.executor.send.buffer.size
topology.receiver.buffer.size
topology.transfer.buffer.size

These may require some tweaking to optimize your topologies, but most
likely the default values will work fine for you out of the box.

4. Decreased Zookeeper load / increased Storm UI performance: Storm
sends significantly less traffic to Zookeeper now (on the order of 10x
less). Since it also uses so many fewer znodes to store state, the UI
is significantly faster as well.

5. Abstractions for shared resources: The TopologyContext has methods
"getTaskData", "getExecutorData", and "getResource" for sharing
resources at the task level, executor level, or worker level
respectively. Currently you can't set any worker resources, but this
is in development. Storm currently provides a shared ExecutorService
worker resource (via "getSharedExecutor" method) that can be used for
launching background tasks on a shared thread pool.

6. Tick tuples: It's common to require a bolt to "do something" at a
fixed interval, like flush writes to a database. Many people have been
using variants of a ClockSpout to send these ticks. The problem with a
ClockSpout is that you can't internalize the need for ticks within
your bolt, so if you forget to set up your bolt correctly within your
topology it won't work correctly. 0.8.0 introduces a new "tick tuple"
config that lets you specify the frequency at which you want to
receive tick tuples via the "topology.tick.tuple.freq.secs" component-
specific config, and then your bolt will receive a tuple from the
__system component and __tick stream at that frequency.

7. Improved Storm UI: The Storm UI now has a button for showing/hiding
the "system stats" (tuples sent on streams other than ones you've
defined, like acker streams), making it easier to digest what your
topology is doing.


There are many other minor improvements included in this development
release. The full change list is below:

* Added executor abstraction between workers and tasks. Workers =
processes, executors = threads that run many tasks from the same spout
or bolt.
* Pluggable scheduler (thanks xumingming)
* Eliminate explicit storage of task->component in Zookeeper
* Number of workers can be dynamically changed at runtime through
rebalance command and -n switch
* Number of executors for a component can be dynamically changed at
runtime through rebalance command and -e switch (multiple -e switches
allowed)
* Use worker heartbeats instead of task heartbeats (thanks
xumingming)
* UI performance for topologies with many executors/tasks much faster
due to optimized usage of Zookeeper (10x improvement)
* Added button to show/hide system stats (e.g., acker component and
stream stats) from the Storm UI (thanks xumingming)
* Stats are tracked on a per-executor basis instead of per-task basis
* Major optimization for unreliable spouts and unanchored tuples
(will use far less CPU)
* Revamped internals of Storm to use LMAX disruptor for internal
queuing. Dramatic reductions in contention and CPU usage.
* Numerous micro-optimizations all throughout the codebase to reduce
CPU usage.
* Optimized internals of Storm to use much fewer threads - two fewer
threads per spout and one fewer thread per acker.
* Removed error method from task hooks (to be re-added at a later
time)
* Validate that subscriptions come from valid components and streams,
and if it's a field grouping that the schema is correct (thanks
xumingming)
* MemoryTransactionalSpout now works in cluster mode
* Only track errors on a component by component basis to reduce the
amount stored in zookeeper (to speed up UI). A side effect of this
change is the removal of the task page in the UI.
* Add TOPOLOGY-TICK-TUPLE-FREQ-SECS config to have Storm
automatically send "tick" tuples to a bolt's execute method coming
from the __system component and __tick stream at the configured
frequency. Meant to be used as a component-specific configuration.
* Upgrade Kryo to v2.04
* Tuple is now an interface and is much cleaner. The Clojure DSL
helpers have been moved to TupleImpl
* Added shared worker resources. Storm provides a shared
ExecutorService thread pool by default. The number of threads in the
pool can be configured with topology.worker.shared.thread.pool.size

Michael Rose

unread,
Jun 6, 2012, 4:11:15 PM6/6/12
to storm...@googlegroups.com
This is huge! Thanks Nathan :)

I'm pretty excited about tick tuples, this'll make it far easier to coordinate certain actions. Further, the use of LMAX and Kryo 2 is also of particular interest.

YOTA

unread,
Jun 6, 2012, 9:59:22 PM6/6/12
to storm...@googlegroups.com

Oh, sounds so interesting!

According to the above, I just found that we can change the number of workers/executors dynamically (on using rebalance, and so on).
Well, could I change also the number of tasks dynamically ?
And, if yes, how can i do that ?
Sorry, I couldn't understand this point properly here.


2012年6月7日木曜日 5時00分34秒 UTC+9 nathanmarz:

Nathan Marz

unread,
Jun 6, 2012, 10:11:13 PM6/6/12
to storm...@googlegroups.com
No, the number of tasks is fixed. That's the whole point. There's no reason to change the number of tasks – the amount of parallelism you have for that spout/bolt is determined by the number of executors.
--
Twitter: @nathanmarz
http://nathanmarz.com

Ken.Zhang

unread,
Jun 6, 2012, 11:37:23 PM6/6/12
to storm...@googlegroups.com
cool! With the new model, maybe rolling resubmit of Topology will be practical later?

Nathan Marz

unread,
Jun 7, 2012, 1:56:20 AM6/7/12
to storm...@googlegroups.com
This new feature doesn't really make it any easier or harder to implement swapping

YOTA

unread,
Jun 7, 2012, 2:52:29 AM6/7/12
to storm...@googlegroups.com

Hi Nathan,

I see, thank you for the reply.


> If you don't specify the number of tasks for a component, it will be
> fixed to the initial number of executors for the lifetime of the
> topology.

> the amount of parallelism you have for that spout/bolt is determined by the number of executors.

in that sense,
should I set the larger number to "taskNum" (than the number of executors) initially (on submitting topology) ?


2012年6月7日木曜日 11時11分13秒 UTC+9 nathanmarz:

Nathan Marz

unread,
Jun 7, 2012, 3:14:11 AM6/7/12
to storm...@googlegroups.com
Yes, if you want to be able to scale it up after submission you should set # tasks higher than # executors. Tasks are pretty cheap, so you can create a lot of them (on the order of hundreds of tasks to a thousand tasks is reasonable).

YOTA

unread,
Jun 7, 2012, 4:30:22 AM6/7/12
to storm...@googlegroups.com

OK, thank you !

2012年6月7日木曜日 16時14分11秒 UTC+9 nathanmarz:

Ken.Zhang

unread,
Jun 7, 2012, 4:39:07 AM6/7/12
to storm...@googlegroups.com
I see that it might be very difficult to control the running cluster and flying tuples. If the payload schema of tuple or something like API or  what ever in spout is not changed, just upgrade the implementation or inner topology packaged in jar, is there any plan for that?
Any way v0.8.0 sounds really smart to upgrade!

anahap

unread,
Jun 8, 2012, 5:58:36 AM6/8/12
to storm...@googlegroups.com
Hi Nathan, this is positively great!
can you provide us with an example of using the custom scheduler, for example to
make sure certain bolts or spouts only run on certain machines?

TIA
Andy

James Xu

unread,
Jun 8, 2012, 6:40:58 AM6/8/12
to storm...@googlegroups.com

Richards Peter

unread,
Jun 12, 2012, 7:33:11 AM6/12/12
to storm...@googlegroups.com
1. Have you removed Config.TOPOLOGY_ACKERS parameter in storm 0.8.0? Is there something else that has to be used for this?

2. I would also like to know how context.getTaskData(String) has to be used? Earlier it was not accepting any arguments. Now is it a map kind of stuff in storm 0.8.0? And has it been implemented completely in the development release?

Thanks,
Richards.

Nathan Marz

unread,
Jun 13, 2012, 1:07:02 AM6/13/12
to storm...@googlegroups.com
On Tue, Jun 12, 2012 at 4:33 AM, Richards Peter <hbkri...@gmail.com> wrote:
1. Have you removed Config.TOPOLOGY_ACKERS parameter in storm 0.8.0? Is there something else that has to be used for this?

It's now called TOPOLOGY_ACKER_EXECUTORS
 

2. I would also like to know how context.getTaskData(String) has to be used? Earlier it was not accepting any arguments. Now is it a map kind of stuff in storm 0.8.0? And has it been implemented completely in the development release?

Yes, now it's a map-like interface so that it's easier to store multiple things in there. It's already implemented in the development release.
 

Thanks,
Richards.

Richards Peter

unread,
Jun 13, 2012, 1:58:32 AM6/13/12
to storm...@googlegroups.com
Thanks Nathan,
Richards Peter.

Nathan Marz

unread,
Jun 13, 2012, 10:44:43 PM6/13/12
to storm...@googlegroups.com
Yes, set the config as a component-specific config. See here for more details: https://github.com/nathanmarz/storm/wiki/Configuration

On Wed, Jun 13, 2012 at 7:42 PM, Binh Nguyen Van <binh...@gmail.com> wrote:
Hi Nathan,

I have multiple bolts in my topology and I want to use tick tuple. Is there any way to setup so that the tick tuple only sent to the bolt that I want? Otherwise I have to check "component" and "stream" source of tuple in every bolt.

Thanks
Binh

On Tuesday, June 12, 2012 10:58:32 PM UTC-7, Richards Peter wrote:
Thanks Nathan,
Richards Peter.

Binh Nguyen Van

unread,
Jun 14, 2012, 2:29:44 PM6/14/12
to storm...@googlegroups.com
I tried to overwrite the getComponentConfiguration in my bolt like this:
    @Override
    public Map<String, Object> getComponentConfiguration()
    {
        Map<String, Object> conf = super.getComponentConfiguration();
        if (conf == null) {
            conf = new Config();
        }
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);

        return conf;
    }
and in the execute of that bolt i just do like this:
    System.out.println(tuple.getSourceComponent() + ", " + tuple.getSourceStreamId());

But when I run my topology in my localhost I do not see any tuple from "__system" component and "__tick" stream. Am I doing anything wrong or do I miss any required configuration?

Thanks
Binh

Nathan Marz

unread,
Jun 15, 2012, 3:47:13 PM6/15/12
to storm...@googlegroups.com
This was a bug in local mode, which is now fixed. I've updated the 0.8.0-SNAPSHOT download as well as the Maven artifacts.

Binh Van Nguyen

unread,
Jun 15, 2012, 7:35:36 PM6/15/12
to storm...@googlegroups.com
Retested and verified that it is working on my localhost.

Thanks
Binh


On Friday, June 15, 2012 12:47:13 PM UTC-7, nathanmarz wrote:
This was a bug in local mode, which is now fixed. I've updated the 0.8.0-SNAPSHOT download as well as the Maven artifacts.


On Thu, Jun 14, 2012 at 11:29 AM, Binh Nguyen Van wrote:
I tried to overwrite the getComponentConfiguration in my bolt like this:
    @Override
    public Map<String, Object> getComponentConfiguration()
    {
        Map<String, Object> conf = super.getComponentConfiguration();
        if (conf == null) {
            conf = new Config();
        }
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);

        return conf;
    }
and in the execute of that bolt i just do like this:
    System.out.println(tuple.getSourceComponent() + ", " + tuple.getSourceStreamId());

But when I run my topology in my localhost I do not see any tuple from "__system" component and "__tick" stream. Am I doing anything wrong or do I miss any required configuration?

Thanks
Binh

On Wednesday, June 13, 2012 7:44:43 PM UTC-7, nathanmarz wrote:
Yes, set the config as a component-specific config. See here for more details: https://github.com/nathanmarz/storm/wiki/Configuration

On Wed, Jun 13, 2012 at 7:42 PM, Binh Nguyen Van wrote:
Hi Nathan,

I have multiple bolts in my topology and I want to use tick tuple. Is there any way to setup so that the tick tuple only sent to the bolt that I want? Otherwise I have to check "component" and "stream" source of tuple in every bolt.

Thanks
Binh

On Tuesday, June 12, 2012 10:58:32 PM UTC-7, Richards Peter wrote:
Thanks Nathan,
Richards Peter.



--
Twitter: @nathanmarz
http://nathanmarz.com

Reply all
Reply to author
Forward
0 new messages