It's less than a week since Storm 0.7.2 was released, but I'm happy to
announce that a development release for Storm 0.8.0 is now available
from the downloads page:
https://github.com/nathanmarz/storm/downloads
Storm 0.8.0 is the most significant release of Storm yet, and a major
step forward in the evolution of the project. Here's a summary of the
changes:
1. Executors: Prior to Storm 0.8.0, a running topology consisted of
some number of workers and some number of tasks that ran on those
workers. In the old model, worker = process and task = thread. Storm
0.8.0 changes this model by introducing executors. In this model, a
worker = process, an executor = thread, and one executor runs many
tasks from the same spout/bolt.
The reason for the change is that the old model complected the
semantics of the topology with its physical execution. For example, if
you had a bolt with 4 tasks doing a fields grouping on some stream, in
order to maintain the semantics of the fields grouping (that the same
value always goes to the same task id for that bolt), the number of
tasks for that bolt needs to be fixed for the lifetime of the
topology, and since task = thread, the number of threads for that bolt
is fixed for the lifetime of the topology. In the new model, the
number of threads for a bolt is disassociated from the number of
tasks, meaning you can change the number of threads for a spout/bolt
dynamically without affecting semantics. Similarly, if you're keeping
large amounts of state in your bolts, and you want to increase the
parallelism of the bolt without having to repartition the state, you
can do this with the new executors.
At the API level, the "parallelism_hint" now specifies the initial
number of executors for that bolt. You can specify the number of tasks
using the TOPOLOGY_TASKS component config. For example:
builder.setBolt(new MyBolt(),
3).setNumTasks(128).shuffleGrouping("spout");
This sets the initial number of executors to 3 and the number of tasks
to 128.
If you don't specify the number of tasks for a component, it will be
fixed to the initial number of executors for the lifetime of the
topology.
Finally, you can change the number of workers and/or number of
executors for components using the "storm rebalance" command. The
following command changes the number of workers for the "demo"
topology to 3, the number of executors for the "myspout" component to
5, and the number of executors for the "mybolt" component to 1:
storm rebalance demo -n 3 -e myspout=5 -e mybolt=1
2. Pluggable scheduler: You can now implement your own scheduler to
replace the default scheduler to assign executors to workers. You
configure the class to use using the "storm.scheduler" config in your
storm.yaml, and your scheduler must implement this interface:
https://github.com/nathanmarz/storm/blob/master/src/jvm/backtype/storm/scheduler/IScheduler.java
3. Throughput improvements: The internals of Storm have been
rearchitected for extremely significant performance gains. I'm seeing
throughput increases of anywhere from 5-10x of what it was before. The
key changes made were:
a) Replacing all the internal in-memory queuing with the LMAX
Disruptor
b) Doing intelligent auto-batching of processing so that the consumers
can keep up with the producers
Here are the configs which affect how buffering/batching is done:
topology.executor.receive.buffer.size
topology.executor.send.buffer.size
topology.receiver.buffer.size
topology.transfer.buffer.size
These may require some tweaking to optimize your topologies, but most
likely the default values will work fine for you out of the box.
4. Decreased Zookeeper load / increased Storm UI performance: Storm
sends significantly less traffic to Zookeeper now (on the order of 10x
less). Since it also uses so many fewer znodes to store state, the UI
is significantly faster as well.
5. Abstractions for shared resources: The TopologyContext has methods
"getTaskData", "getExecutorData", and "getResource" for sharing
resources at the task level, executor level, or worker level
respectively. Currently you can't set any worker resources, but this
is in development. Storm currently provides a shared ExecutorService
worker resource (via "getSharedExecutor" method) that can be used for
launching background tasks on a shared thread pool.
6. Tick tuples: It's common to require a bolt to "do something" at a
fixed interval, like flush writes to a database. Many people have been
using variants of a ClockSpout to send these ticks. The problem with a
ClockSpout is that you can't internalize the need for ticks within
your bolt, so if you forget to set up your bolt correctly within your
topology it won't work correctly. 0.8.0 introduces a new "tick tuple"
config that lets you specify the frequency at which you want to
receive tick tuples via the "topology.tick.tuple.freq.secs" component-
specific config, and then your bolt will receive a tuple from the
__system component and __tick stream at that frequency.
7. Improved Storm UI: The Storm UI now has a button for showing/hiding
the "system stats" (tuples sent on streams other than ones you've
defined, like acker streams), making it easier to digest what your
topology is doing.
There are many other minor improvements included in this development
release. The full change list is below:
* Added executor abstraction between workers and tasks. Workers =
processes, executors = threads that run many tasks from the same spout
or bolt.
* Pluggable scheduler (thanks xumingming)
* Eliminate explicit storage of task->component in Zookeeper
* Number of workers can be dynamically changed at runtime through
rebalance command and -n switch
* Number of executors for a component can be dynamically changed at
runtime through rebalance command and -e switch (multiple -e switches
allowed)
* Use worker heartbeats instead of task heartbeats (thanks
xumingming)
* UI performance for topologies with many executors/tasks much faster
due to optimized usage of Zookeeper (10x improvement)
* Added button to show/hide system stats (e.g., acker component and
stream stats) from the Storm UI (thanks xumingming)
* Stats are tracked on a per-executor basis instead of per-task basis
* Major optimization for unreliable spouts and unanchored tuples
(will use far less CPU)
* Revamped internals of Storm to use LMAX disruptor for internal
queuing. Dramatic reductions in contention and CPU usage.
* Numerous micro-optimizations all throughout the codebase to reduce
CPU usage.
* Optimized internals of Storm to use much fewer threads - two fewer
threads per spout and one fewer thread per acker.
* Removed error method from task hooks (to be re-added at a later
time)
* Validate that subscriptions come from valid components and streams,
and if it's a field grouping that the schema is correct (thanks
xumingming)
* MemoryTransactionalSpout now works in cluster mode
* Only track errors on a component by component basis to reduce the
amount stored in zookeeper (to speed up UI). A side effect of this
change is the removal of the task page in the UI.
* Add TOPOLOGY-TICK-TUPLE-FREQ-SECS config to have Storm
automatically send "tick" tuples to a bolt's execute method coming
from the __system component and __tick stream at the configured
frequency. Meant to be used as a component-specific configuration.
* Upgrade Kryo to v2.04
* Tuple is now an interface and is much cleaner. The Clojure DSL
helpers have been moved to TupleImpl
* Added shared worker resources. Storm provides a shared
ExecutorService thread pool by default. The number of threads in the
pool can be configured with topology.worker.shared.thread.pool.size