Simplicity in Storm versus Hadoop and Actors (aka Storm as a new foundation)

1,432 views
Skip to first unread message

kovas boguta

unread,
Dec 26, 2011, 7:00:06 PM12/26/11
to storm...@googlegroups.com
I've been taking a closer look at Storm over the holidays, and getting
increasingly impressed with what Nathan has pulled off.

Today, Storm's killer application is stream processing. But even in
Nathan's release presentation, that's only one of the three use cases.

So setting aside specific applications, what does Storm offer, and how
does that compare with other solutions? The way I see it, it provides
several fundamental primitives:

- algorithmically distributing units of work
- making sure they get done
- ability to compose these distributed, fault tolerant units of work

These capabilities are also present in Hadoop and Actors, but there
they are, in the parlance of our times, complected with other things.
And thus are less like primitives, and more like features -- and
harder to build upon.

In actors, for instance, the units of work are complected with the
destination of the work, much like the worker/queue situation in
Nathan's presentation. In Hadoop, the map-reduce algorithm is
complected with fault-tolerance and distribution.

Thinking of Storm as a solution to the distributed computation problem
opens the door to more general applications, in particular the
often-mentioned question of bridging Storm and Hadoop.

As a thought experiment, lets take a look at Hadoop's remaining core
capabilities, and think how libraries/extensions of Storm could
replicate them:

1. Efficient batch computation via appropriate buffering, compression,
file formats
An individual hadoop node achieves high throughput via batch
processing records. Storm can be extended to support batched
abstractions like clojure's chunked-seq, that are transparent to the
programmer . This gets you higher-throughput IO, and provides
opportunity for compression. This could be implemented in the storm
core, or by user libraries. Efficient file formats like avro can be
utilized by bolts directly without going through the whole hadoop
framework.

2. Efficient utilization of HDFS via leveraging data locality
If bolts are to process data residing in HDFS, we can use Storm's
stream grouping to associate file splits with bolts running at the
corresponding node. The input HDFS file is broken up into a stream of
messages where the message id is the file split; these get sent to the
appropriate bolt, the bolt reads the data using off-the-shelf hadoop
input formats, writes the result to HDFS and sends the resulting
splits to the next bolts in the topology. In fact, you could break up
the process into three bolts on the same machine: a bolt that reads
the file, a bolt that does computation, and another bolt that writes
another file -- thus separating the core logic from the IO and
allowing you to do both batch and realtime computation with the same
core bolt. This can be done at the library level.

3. Larger-than-RAM datasets via constant serialization and the
spill-to-disk sorting step
The crux of the issue is that the Hadoop reduce step requires seeing
the whole dataset at once, which therefore entails swapping between
disk and RAM -- you can't just group values in memory like the current
Storm design pattern. Perhaps a partial solution is to reuse the
appropriate Hadoop classes within a bolt wrapper, which will outsource
that part of the computation and eventually return the sorted tuple
stream.

Of course, the obvious question is why bother replicating what already
works in Hadoop. Several reasons come to mind:

1. Reducing code duplication and unnecessary complexity. Consider
processing log files. You have a ton of them sitting around, and also
have them streaming in. It would be nice to turn a simple nob to get
batch-processing behavior to catch up on historical data, then switch
to the realtime stream.

2. Supporting different tradeoffs than Hadoop, for example interactive
data manipulation with medium sized data that fits in RAM a la Spark.
This is basically DRPC to its logical extreme.

3. Supporting algorithms in addition to map-reduce.

4. Mixing map-reduce with a more complex sequence of operations
involving databases and other services. Currently this is a huge mess,
with code getting crammed into the mappers/reducers, the
GenericOptionsParser/ToolRunner, external programs triggered by
pre/post hooks, Oozie, etc. What you really want is an arbitrary chain
of distributed computations with the same characteristics and same
language as the rest of your system.

At the end of the day one wants to build data systems out of
composable abstractions, and I'm hopeful Storm can improve the
situation for applications old and new. Happy hacking!

Brian O'Neill

unread,
Dec 26, 2011, 10:25:37 PM12/26/11
to storm...@googlegroups.com

Kovas,

I couldn't agree more. A few of us got a chance to talk this concept over at the Cassandra Conference in NYC a few weeks back. After that conversation, we came back and started noodling life w/o Hadoop.

Especially in our case, where the majority of our data is stored in Cassandra, where the problem of data locality is easier to solve enabling us to move the processing to the pertinent data. In a few weeks time, we intend to look at bridging that gap, introducing Cassandra's concept of data locality to Storm's processing layer. With that in place, we believe we can use the simplicity of Storm for all of our processing and slowly migrate existing Map/Reduce jobs to Storm.

As part of that experiment, we hope to race Hadoop and Storm for HDFS loads into Cassandra, as well as extracts from Cassandra to HDFS, and Cassandra-to-Cassandra transforms.

Our goal is exactly what you state: a single system to solve both real-time and batching processing through the use of primitives (instead of storage-specific constructs). Right now, as a stop gap measure, we're articulating ETL "primitives" as ruby code that can deploy to Hadoop. We then hope to use that same ruby code on Storm.

A little experiment...
http://brianoneill.blogspot.com/2011/12/hadoopmapreduce-on-cassandra-using-ruby.html

I love the thoughts. Please keep us all informed if you make any progress on this.
As another extension... we are also looking at Cascading, perhaps their is a way to bridge the worlds with the pipes and filters metaphor / primitives.
http://www.cascading.org/

If nothing else, tuples in and tuples out seems like a good place to start. ;)

cheers,
-brian

--
Brian ONeill
Architect, Iron Mountain (www.ironmountain.com)
mobile:215.588.6024
blog: http://weblogs.java.net/blog/boneill42/
blog: http://brianoneill.blogspot.com/

kovas boguta

unread,
Dec 27, 2011, 2:01:37 AM12/27/11
to storm...@googlegroups.com
Hey Brian,

Thanks for bringing up Cassandra. That's actually pretty important,
too. First of all, to remember that HDFS is not the only game in the
distributed storage town. And secondly, that vanilla hadoop MR does
not do a great job with these non-HDFS datasets in the likes of
Cassandra, Riak etc. Would be interesting to know if its easier to
bend storm to your will than to hack hadoop.

Actually, I think this points to the achilles heel of Hadoop MR.
Having flat, unchanging files that you can just blast through is nice
if you can get them. But much of the most valuable data in the world
sits in some kind of database, because it needs to be randomly
accessed, changed, and tied together with other ongoing computations.
So to process it in Hadoop, you first must already do another
(potentially huge) batch computation to ETL it, at which point you've
lost data locality, introduced complexity, and resigned yourself to
computing with stale data.

In the future, this problem will get worse. The more complex your data
processing, the more data sources you need to take into account, the
more services you need to push and pull data from, the harder it will
be to generate that hadoop-friendly, self-contained flat file. At some
point you'll need Storm just to ETL the hadoop cluster :) Might as
well use it for the actual computation itself.

Anyway, thanks for the comments!

Brian O'Neill

unread,
Dec 27, 2011, 10:04:53 AM12/27/11
to storm...@googlegroups.com, Kevin Burton
Kovas,

Exactly!  We're looking at processing thousands of disparate data sources through fairly complex data processing steps.

I'm not familiar with it yet, but I just saw this come across the Cassandra user list:

It still uses an M/R paradigm, but Kevin Burton describes it as:
"I'm pleased to announce Peregrine 0.5.0 - a new map reduce framework optimized
for iterative and pipelined map reduce jobs."
...
"Peregrine is optimized for ETL jobs where the primary data storage system is an
external database such as Cassandra, Hbase, MySQL, etc.  Jobs are then run as a
Extract, Transform and Load stages with intermediate data being stored in the
Peregrine FS."

This is worth a read:

(CCing Kevin, I thought he might want to jump in on this thread as well)

cheers,
-brian

-- 
Brian ONeill
Lead Architect, Health Market Science (http://healthmarketscience.com)

kovas boguta

unread,
Dec 27, 2011, 10:30:33 PM12/27/11
to storm...@googlegroups.com, Kevin Burton
That's pretty interesting... Its cool to see so much activity in the space.

P. Taylor Goetz

unread,
Dec 28, 2011, 12:06:42 AM12/28/11
to storm...@googlegroups.com, kovas boguta
Hey Kovas,

Excellent points! Thanks for joining in on the "bridging realtime and batch" thread.

Brian (he and I work together) has covered most of it, but every single point you've made echoes exactly what we've been grappling with.

All you're thought experiment points are, and have been, rattling around in my head. But at the forefront right now is the concept of data locality within storm.

In our current dev/test setup, each "slave" node is a Hadoop DataNode and Task Tracker, Cassandra Node, and Storm Supervisor.  With Hadoop, we have data locality, but not with storm.

For example, when a bolt that writes to cassandra receives a tuple, it may be sitting on the same machine that will write that data to disk, but still has to reach out to the network in the event it is not.

Like you're idea of using stream groupings (or some new grouping implementation) to associate HDFS file splits with particular bolts, I've been considering a similar approach by leveraging Cassandra's hashing/token scheme within a bolt to achieve data locality.

 Some of the new features Nathan's added to 0.6.1 seem to be moving somewhat in that direction (rebalancing, etc.).

Some concepts I've been thinking of are a little broader, and would likely involve significant API changes, such as:

- Data Center/Rack/Node awareness -- essentially the ability to pin, or define some sort of affinity, such that you can define or suggest where within a cluster a storm worker component (spout/bolt) gets started/deployed. This could get complicated quickly, and would be a deviation from the simplicity that IMO makes storm so great. But just to think out loud… maybe some concept like "Resource" and "ResourceAffinity" could be introduced in the "Topology" concept?

- Resource (this time I'm talking in terms Memory/Processor/Network resources) Awareness -- basically rebalance a topology based on resource utilization


I've got some psuedocode ideas in my head that I'll save for a follow-up :)

- Taylor

P. Taylor Goetz
Lead Developer, Software Development
Health Market Science | 2700 Horizon Drive | King of Prussia, PA 19406
p: 610.994.5237

www.healthmarketscience.com

Nathan Marz

unread,
Dec 28, 2011, 3:22:13 AM12/28/11
to storm...@googlegroups.com
The TransactionalSpout stuff upcoming in 0.7.0 is essentially incremental, pipelined, and generalized batch processing (for "smallish" batches). Interestingly, I was able to implement this on top of Storm's existing primitives of spouts and bolts. This stuff definitely bridges the gap somewhat between Storm and Hadoop.

The first foray for Storm into managing state will be the "state spout" primitive (see my launch presentation on InfoQ for more details on that). There may be opportunities to enhance that abstraction to take advantage of data locality opportunities.
--
Twitter: @nathanmarz
http://nathanmarz.com

Evan Chan

unread,
Dec 28, 2011, 3:46:40 AM12/28/11
to storm-user
WRT Cassandra, the Datastax guys have developed a variant of MR called
Brisk that works far more efficiently with Cassandra ( think much
smaller data sets or row orientation, vs the huge block sizes of
HDFS). They would be great partners to work with for Storm
optimization for Cassandra data locality.
> >http://brianoneill.blogspot.com/2011/12/hadoopmapreduce-on-cassandra-...

kovas boguta

unread,
Dec 29, 2011, 6:56:12 PM12/29/11
to storm...@googlegroups.com, Frederico Meinberg
I think I've had some moments of clarity.All these problems boil down
to associating data with operations at nodes. The computational
paradigm of the distributed system is to partition, match, and
transfer either the data, the operations, or some mixture.
In Hadoop, we partition the operations and distribute them to the
nodes containing the data, only moving data when necessary. This makes
sense for batch computations, since the data is already residing
somewhere and is difficult to move.
In Storm, we partition the data and distribute them to the nodes
containing the operations. This makes sense for streaming data, since
the operations are known ahead of time, but the data is not.
In my original message I was brainstorming ways to use Storm to
distribute the operations -- instead of sending the data itself
through storm messages, you send data processing instructions to the
nodes, which pick up the data from disk or other local services.
The key point is that the partition-match-transfer paradigm applies to
the operation of the distributed system itself, and that is exactly
where hadoop falls down. The state of the distributed system -- what
tasks succeed, which nodes have failed, etc -- is itself data that
needs to be associated with operations that determine the next steps.
In Hadoop, the orchestration layer is not composed of robust
distributed computing primitives, but of special purpose daemons that
are intertwined to achieve its flavor of map-reduce. This is the
principle cause of complexity and limitation in hadoop. Hadoop's
single-points-of-failure (name node and job tracker) can be directly
attributed to the lack of partition-match-transfer on its core
operational data. And because the design is not based on
programmer-accessible primitives, its a major source code and javadoc
adventure to build upon existing capabilities.

Storm provides the primitives to solve the coordination problem in a
principled way. Instead of a tightly coupled set of complicated
daemons, let storm provide a uniform paradigm for communication and
failover between the components. Each hadoop daemon is replaced with a
bolt or spout with a fraction of the custom logic.
This make the system simpler, more robust, and more transparent. But
the big win is that it opens the door to composability.
With a good library of bolts, you can span the distributed computation
problem space. Each individual bolt just needs to provide simple
services -- like a write-ahead log, b+ tree, etc. And suddenly giving
data pipelines (for example) flume-like or hbase-like capabilities
happens at the programming language level, rather than at the systems
level, which in turn opens the door to higher-level abstractions and
composability.
The core, though, is the primitives for the partition-match-transfer
of both data and operations. One can imagine some hacks for
co-locating bolts with the input splits they need to process in HDFS,
or aligning their placement with cassandra's data partitions, but
there needs to be a general solution. Will be interesting to see how
far StateSpout and TransactionalSpout gets us.
Finally, these primitives must be general enough to erect the
operation of the system itself. A typical example is nodes that build
up state as they process data, such as the hadoop name node, job
tracker, and the storm trending topics example. It should be possible
to build up and manage that state data in the exact same way as any
other data -- if the node blows up, it should be possible to
reassemble it the same way we would reassemble some other part of the
pipeline.
In this very Lispy view, the combination of data and operation that
defines a stateful bolt is itself data, that one might optionally want
to have various distribution and persistence guarantees.
The Hadoop name node, secondary name node, and job tracker are nice
examples to think about making it "easy" to build up stateful,
distributed bolts from some simple primitives.

kovas boguta

unread,
Dec 29, 2011, 6:58:15 PM12/29/11
to storm...@googlegroups.com, Frederico Meinberg
(resending previous message with fixed line breaks)

Brian O'Neill

unread,
Dec 29, 2011, 10:00:57 PM12/29/11
to storm...@googlegroups.com, Frederico Meinberg

Great breakdown of the problem Kovas, and I agree completely.

I think it might be worth trying to put some pictures to this.  If I get a chance, I'll try to get a presentation together capturing this.   I think we should try define the "data processing instructions" that would be sent to the nodes.   The instructions would include two things: the operation to perform, and the metadata about the actual data to be processed instead of the data tuples themselves)   Matching then becomes a process of determining which node has both the operation and the ability to resolve the metadata into actual data tuples. 

Is that they way you see it?

And I also agree with using another set of primitives to articulate the data partitioning in a generic way.   We were mulling this over a bit because some of our data resides on a SAN that is accessible from all nodes.   In this case, it doesn't matter which node picks up the instruction, but we still need to communicate which partition of the data to operate on.

We have a jam packed January, but Taylor and I should have some time in Feb to work this in earnest.  If 0.7 is far enough along, we can probably run some experiments.

-brian

Nathan Marz

unread,
Dec 30, 2011, 6:53:34 AM12/30/11
to storm...@googlegroups.com, Frederico Meinberg
A lot of interesting points being made here. Making Storm capable of doing very large Hadoop-like jobs was never a goal for the project, but now I'm starting to find that an interesting possibility. TransactionalSpout in 0.7.0 will be a good demonstration of using streams not just for data, but for metadata and coordination as well. If you're curious, check out the following classes in the transactional-spout branch:



suraj

unread,
Dec 30, 2011, 9:35:43 PM12/30/11
to storm-user
I completely agree with Kovas. Great description Kovas!! Great work
Nathan!!

Cheers,
Suraj


On Dec 30, 3:53 am, Nathan Marz <nathan.m...@gmail.com> wrote:
> A lot of interesting points being made here. Making Storm capable of doing
> very large Hadoop-like jobs was never a goal for the project, but now I'm
> starting to find that an interesting possibility. TransactionalSpout in
> 0.7.0 will be a good demonstration of using streams not just for data, but
> for metadata and coordination as well. If you're curious, check out the
> following classes in the transactional-spout branch:
>
> https://github.com/nathanmarz/storm/blob/transactional-spout/src/jvm/...
>
> https://github.com/nathanmarz/storm/blob/transactional-spout/src/jvm/...
>
> https://github.com/nathanmarz/storm/blob/transactional-spout/src/jvm/...
> > On Thu, Dec 29, 2011 at 6:56 PM, kovas boguta <kovas.bog...@gmail.com>
> ...
>
> read more »

kovas boguta

unread,
Dec 26, 2012, 7:13:53 PM12/26/12
to storm...@googlegroups.com
Bumping this thread, in light of new developments. We now have the
tech to do something way, way cooler than hadoop and its spinoffs.

The principle should be: if it gets put in storage, it should be
organized. Otherwise you are just buffering the stream.

Suppose you have storm bolts that simply wrap a datomic database on
local storage.

You immediately get

1. Ability to reduce larger-than-memory datasets
2. Query the resulting dataset
2b. Create indices on the dataset
3. Ability to backup the state of your system to the likes of s3
4. Reuse UDFs across batch, incremental, and query-like computations

With more work, you can get

5. Distributed datalog (with restrictions certainly, but still more
powerful than what we get elsewhere)

Also note that there is a mechanical transformation from relational
tables to a datomic schema.

The main obstacles right now are
1. Datomic not yet optimized for bulk load
1b. Potentially large memory overhead per reducer
2. Lack of complete control over which covering indices are generated
by datomic.
3. Lack of a federation model for separate datomic databases
Reply all
Reply to author
Forward
0 new messages