Today, Storm's killer application is stream processing. But even in
Nathan's release presentation, that's only one of the three use cases.
So setting aside specific applications, what does Storm offer, and how
does that compare with other solutions? The way I see it, it provides
several fundamental primitives:
- algorithmically distributing units of work
- making sure they get done
- ability to compose these distributed, fault tolerant units of work
These capabilities are also present in Hadoop and Actors, but there
they are, in the parlance of our times, complected with other things.
And thus are less like primitives, and more like features -- and
harder to build upon.
In actors, for instance, the units of work are complected with the
destination of the work, much like the worker/queue situation in
Nathan's presentation. In Hadoop, the map-reduce algorithm is
complected with fault-tolerance and distribution.
Thinking of Storm as a solution to the distributed computation problem
opens the door to more general applications, in particular the
often-mentioned question of bridging Storm and Hadoop.
As a thought experiment, lets take a look at Hadoop's remaining core
capabilities, and think how libraries/extensions of Storm could
replicate them:
1. Efficient batch computation via appropriate buffering, compression,
file formats
An individual hadoop node achieves high throughput via batch
processing records. Storm can be extended to support batched
abstractions like clojure's chunked-seq, that are transparent to the
programmer . This gets you higher-throughput IO, and provides
opportunity for compression. This could be implemented in the storm
core, or by user libraries. Efficient file formats like avro can be
utilized by bolts directly without going through the whole hadoop
framework.
2. Efficient utilization of HDFS via leveraging data locality
If bolts are to process data residing in HDFS, we can use Storm's
stream grouping to associate file splits with bolts running at the
corresponding node. The input HDFS file is broken up into a stream of
messages where the message id is the file split; these get sent to the
appropriate bolt, the bolt reads the data using off-the-shelf hadoop
input formats, writes the result to HDFS and sends the resulting
splits to the next bolts in the topology. In fact, you could break up
the process into three bolts on the same machine: a bolt that reads
the file, a bolt that does computation, and another bolt that writes
another file -- thus separating the core logic from the IO and
allowing you to do both batch and realtime computation with the same
core bolt. This can be done at the library level.
3. Larger-than-RAM datasets via constant serialization and the
spill-to-disk sorting step
The crux of the issue is that the Hadoop reduce step requires seeing
the whole dataset at once, which therefore entails swapping between
disk and RAM -- you can't just group values in memory like the current
Storm design pattern. Perhaps a partial solution is to reuse the
appropriate Hadoop classes within a bolt wrapper, which will outsource
that part of the computation and eventually return the sorted tuple
stream.
Of course, the obvious question is why bother replicating what already
works in Hadoop. Several reasons come to mind:
1. Reducing code duplication and unnecessary complexity. Consider
processing log files. You have a ton of them sitting around, and also
have them streaming in. It would be nice to turn a simple nob to get
batch-processing behavior to catch up on historical data, then switch
to the realtime stream.
2. Supporting different tradeoffs than Hadoop, for example interactive
data manipulation with medium sized data that fits in RAM a la Spark.
This is basically DRPC to its logical extreme.
3. Supporting algorithms in addition to map-reduce.
4. Mixing map-reduce with a more complex sequence of operations
involving databases and other services. Currently this is a huge mess,
with code getting crammed into the mappers/reducers, the
GenericOptionsParser/ToolRunner, external programs triggered by
pre/post hooks, Oozie, etc. What you really want is an arbitrary chain
of distributed computations with the same characteristics and same
language as the rest of your system.
At the end of the day one wants to build data systems out of
composable abstractions, and I'm hopeful Storm can improve the
situation for applications old and new. Happy hacking!
I couldn't agree more. A few of us got a chance to talk this concept over at the Cassandra Conference in NYC a few weeks back. After that conversation, we came back and started noodling life w/o Hadoop.
Especially in our case, where the majority of our data is stored in Cassandra, where the problem of data locality is easier to solve enabling us to move the processing to the pertinent data. In a few weeks time, we intend to look at bridging that gap, introducing Cassandra's concept of data locality to Storm's processing layer. With that in place, we believe we can use the simplicity of Storm for all of our processing and slowly migrate existing Map/Reduce jobs to Storm.
As part of that experiment, we hope to race Hadoop and Storm for HDFS loads into Cassandra, as well as extracts from Cassandra to HDFS, and Cassandra-to-Cassandra transforms.
Our goal is exactly what you state: a single system to solve both real-time and batching processing through the use of primitives (instead of storage-specific constructs). Right now, as a stop gap measure, we're articulating ETL "primitives" as ruby code that can deploy to Hadoop. We then hope to use that same ruby code on Storm.
A little experiment...
http://brianoneill.blogspot.com/2011/12/hadoopmapreduce-on-cassandra-using-ruby.html
I love the thoughts. Please keep us all informed if you make any progress on this.
As another extension... we are also looking at Cascading, perhaps their is a way to bridge the worlds with the pipes and filters metaphor / primitives.
http://www.cascading.org/
If nothing else, tuples in and tuples out seems like a good place to start. ;)
cheers,
-brian
--
Brian ONeill
Architect, Iron Mountain (www.ironmountain.com)
mobile:215.588.6024
blog: http://weblogs.java.net/blog/boneill42/
blog: http://brianoneill.blogspot.com/
Thanks for bringing up Cassandra. That's actually pretty important,
too. First of all, to remember that HDFS is not the only game in the
distributed storage town. And secondly, that vanilla hadoop MR does
not do a great job with these non-HDFS datasets in the likes of
Cassandra, Riak etc. Would be interesting to know if its easier to
bend storm to your will than to hack hadoop.
Actually, I think this points to the achilles heel of Hadoop MR.
Having flat, unchanging files that you can just blast through is nice
if you can get them. But much of the most valuable data in the world
sits in some kind of database, because it needs to be randomly
accessed, changed, and tied together with other ongoing computations.
So to process it in Hadoop, you first must already do another
(potentially huge) batch computation to ETL it, at which point you've
lost data locality, introduced complexity, and resigned yourself to
computing with stale data.
In the future, this problem will get worse. The more complex your data
processing, the more data sources you need to take into account, the
more services you need to push and pull data from, the harder it will
be to generate that hadoop-friendly, self-contained flat file. At some
point you'll need Storm just to ETL the hadoop cluster :) Might as
well use it for the actual computation itself.
Anyway, thanks for the comments!
Storm provides the primitives to solve the coordination problem in a
principled way. Instead of a tightly coupled set of complicated
daemons, let storm provide a uniform paradigm for communication and
failover between the components. Each hadoop daemon is replaced with a
bolt or spout with a fraction of the custom logic.
This make the system simpler, more robust, and more transparent. But
the big win is that it opens the door to composability.
With a good library of bolts, you can span the distributed computation
problem space. Each individual bolt just needs to provide simple
services -- like a write-ahead log, b+ tree, etc. And suddenly giving
data pipelines (for example) flume-like or hbase-like capabilities
happens at the programming language level, rather than at the systems
level, which in turn opens the door to higher-level abstractions and
composability.
The core, though, is the primitives for the partition-match-transfer
of both data and operations. One can imagine some hacks for
co-locating bolts with the input splits they need to process in HDFS,
or aligning their placement with cassandra's data partitions, but
there needs to be a general solution. Will be interesting to see how
far StateSpout and TransactionalSpout gets us.
Finally, these primitives must be general enough to erect the
operation of the system itself. A typical example is nodes that build
up state as they process data, such as the hadoop name node, job
tracker, and the storm trending topics example. It should be possible
to build up and manage that state data in the exact same way as any
other data -- if the node blows up, it should be possible to
reassemble it the same way we would reassemble some other part of the
pipeline.
In this very Lispy view, the combination of data and operation that
defines a stateful bolt is itself data, that one might optionally want
to have various distribution and persistence guarantees.
The Hadoop name node, secondary name node, and job tracker are nice
examples to think about making it "easy" to build up stateful,
distributed bolts from some simple primitives.