Storm + Hadoop Workflows

399 views
Skip to first unread message

P. Taylor Goetz

unread,
Nov 2, 2011, 10:37:59 PM11/2/11
to storm...@googlegroups.com
Nathan,

We're currently looking at using Storm in conjunction with Hadoop to create a platform that supports both real-time and batch processing of records.

We basically have two types of data providers: clients who submit data in batches, and clients who submit one record at a time (real time).

(for now we don't have to think of the "real-time" providers as DRPC clients -- they're just dropping off data in a "fire and forget" fashion)

Our Hadoop-based batch jobs basically just run a workflow over a flat file of individual records - standard Hadoop stuff.

Our Storm topologies will do basically the same thing, but one record at a time, in real time as the data becomes available.

My question is this:

What approaches/tools/best practices would you recommend that would allow us to articulate the workflow/rules such that they can be re-used in both Hadoop Map/Reduce jobs as well as Storm topologies?

The easy answer seems to be to just encapsulate the workflow/rules into libraries that can be shared by the M/R jobs and the Storm topologies.

However, that model starts to fall apart when we start considering M/R tools like cascading, pig, etc., since we would have to maintain identical business logic in to places (i.e. a M/R DSL + Storm topology).

In your experience, have you run into a similar situation? And if so, what approaches did/did not work for you?

Thanks,

-Taylor

Ashley Brown

unread,
Nov 3, 2011, 4:07:07 AM11/3/11
to storm...@googlegroups.com
Hi Taylor,

I hope you don't mind me sharing some information about our architecture, even though your query was addressed at Nathan.


In your experience, have you run into a similar situation? And if so, what approaches did/did not work for you?

We have a similar situation, in that we are generally streamed logs, but they can also be submitted in batch. We're currently building out our architecture at the moment, but the design is as follows:
  • Storm handles our real-time processing, aggregations and database writes.
  • The same data is (will be) sent to Hadoop for batch processing, after which the database is (will be) patched with the MR results.
  • At the same time we (will) compare the real-time results and batch results and flag any deviations, comparing relevant time periods to error logs.
  • For batch customers, we can run with Hadoop only.
We're happy to replicate our business logic so Storm and Hadoop can operate as redundant, separate checks of each other. If we have a problem, we want to flag it right away.

You should also bear in mind that Storm provides 'at least once' semantics, not 'exactly once' so you might want to run your real-time data in batch as a check.

A

--
Dr Ashley Brown
Chief Architect

e: ash...@spider.io
a: SpiderCrunch Limited, 353 The Strand, WC2R 0HS
w: http://spider.io/

nathanmarz

unread,
Nov 3, 2011, 5:52:05 PM11/3/11
to storm-user
Answers inline.

On Nov 2, 7:37 pm, "P. Taylor Goetz" <ptgo...@gmail.com> wrote:
> Nathan,
>
> We're currently looking at using Storm in conjunction with Hadoop to create
> a platform that supports both real-time and batch processing of records.
>
> We basically have two types of data providers: clients who submit data in
> batches, and clients who submit one record at a time (real time).
>
> (for now we don't have to think of the "real-time" providers as DRPC
> clients -- they're just dropping off data in a "fire and forget" fashion)
>
> Our Hadoop-based batch jobs basically just run a workflow over a flat file
> of individual records - standard Hadoop stuff.
>
> Our Storm topologies will do basically the same thing, but one record at a
> time, in real time as the data becomes available.
>
> My question is this:
>
> What approaches/tools/best practices would you recommend that would allow
> us to articulate the workflow/rules such that they can be re-used in both
> Hadoop Map/Reduce jobs as well as Storm topologies?
>
> The easy answer seems to be to just encapsulate the workflow/rules into
> libraries that can be shared by the M/R jobs and the Storm topologies.

This is exactly what I do.

>
> However, that model starts to fall apart when we start considering M/R
> tools like cascading, pig, etc., since we would have to maintain identical
> business logic in to places (i.e. a M/R DSL + Storm topology).
>
> In your experience, have you run into a similar situation? And if so, what
> approaches did/did not work for you?

I have the same situation. One important thing to recognize is that
there are fundamental differences between batch and realtime
computation. Echoing what Ashley said, with batch processing it's easy
to get idempotence, whereas when dealing with incremental algorithms
and at-least-once semantics it's harder to achieve idempotence.

The differing semantics between batch and stream processing means that
the tools for batch processing don't have a clear mapping to stream
processing. For example, joins as defined in batch processing don't
make sense when dealing with infinite data streams.

Additionally, you don't always do the same thing in batch as in
realtime. It's common to use an approximation algorithm for the
realtime portion and the exact algorithm for the batch portion, and
the batch portion corrects the realtime portion over time.

Long term I definitely think a higher level abstraction over batch and
realtime will emerge. It's something I think about a lot, but it's not
clear yet how to provide the conciseness that you want while still
providing the flexibility to make approximations, CAP tradeoffs, etc.
Eventually we'll figure that stuff out -- we just need more experience
hammering against these problems using these new approaches.

>
> Thanks,
>
> -Taylor

Thomas Bernhardt

unread,
Nov 3, 2011, 6:03:44 PM11/3/11
to storm...@googlegroups.com
We at Esper provide EPL, a declarative language for event processing. Event processing may be the good generalization, since whether events come from batch or from real-time the EPL execution engine produces the same result for the same declarative rules, even possibly when considering data over time. It is not a simple route even with EPL and as Nathan points out, but perhaps you will find it useful. I'm the founder of the Esper open source project, http://esper.codehaus.org

Best regards,
Tom

From: nathanmarz <natha...@gmail.com>
To: storm-user <storm...@googlegroups.com>
Sent: Thursday, November 3, 2011 5:52 PM
Subject: [storm-user] Re: Storm + Hadoop Workflows

Martin Scholl

unread,
Nov 3, 2011, 6:23:35 PM11/3/11
to storm...@googlegroups.com
Hi all,

On Nov 3, 2011, at 10:52 PM, nathanmarz wrote:
> Long term I definitely think a higher level abstraction over batch and
> realtime will emerge. It's something I think about a lot, but it's not
> clear yet how to provide the conciseness that you want while still
> providing the flexibility to make approximations, CAP tradeoffs, etc.
> Eventually we'll figure that stuff out -- we just need more experience
> hammering against these problems using these new approaches.

When it comes to things like this, I am a big believe in co-invention. It would be great if you could write your thoughts down, or if you could describe the problems and issues you see and regard as open in a little more detail — in order for others to join in.

I hope this makes sense to you, and I would love to join (not necessarily implying and can be of any help though).


Martin

Nathan Stults

unread,
Nov 3, 2011, 9:27:02 PM11/3/11
to storm-user
tldr : Maybe a good place to start would be Cascading for Storm?

This question was asked the other day on the Cascading group as well:
http://groups.google.com/group/cascading-user/browse_thread/thread/8f80ca8a81cd6038

First, I am a Cascading novice, and of course a Storm novice, so I may
be way off base in my thinking, but I think it is definitely worth a
discussion, as it feels like these higher level abstractions are
imminent, on the tip of the tongue, ready to be teased out.

Cascading itself might be a good model or starting point as it acts as
a kind of compiler for stream transformations where the assembly
language or AST is an object graph of data processing operations.

As you all are probably aware, on the edge version of Cascading Hadoop
has been completely decoupled from the core (see the link above), so
it seems to be standing ready to be adapted to different stream
processing platforms such as Storm. Of course some of the current (and
central) building blocks prominent in Cascading, such as joins,
sorting and maybe buffers as they currently exist, just aren't
appropriate for real time use as has been discussed.

But Cascading also has sub-assemblies which allows for composition of
lower level pipe components and could serve as a form of polymorphism
allowing higher level outcomes or algorithms to be described and
configured in a declarative way but be implemented behind the scenes
as a pair of sub-assemblies composed of batch primitives on the one
hand and stream primitives (which obviously don't exist in Cascading
yet) on the other. The batch implementation may be an exact version of
an algorithm, and the stream version an approximation, as appropriate.

Above Cascading (as it currently exists anyway) a new kind of
"Planner" or could interpret the higher level system definition and
emit equivalent stream/batch sub-assemblies based on the specified
target. These would then be further interpreted by an appropriate
platform specific Cascading planner to create parallel map/reduce jobs
or topologies. But that would be down the road a bit.

I don't think it is necessary to envision a general purpose, one size
fits all higher level abstraction, which might be very difficult to
achieve via thought experiment. I think a good first step might be to
start work on Cascading, or Cascading style real time stream
processing primitives that can be composed and used to automate the
construction of topologies in the same way Cascading currently does
for batch jobs. There are a lot of differences, but also a lot of
overlap.

This would be useful just on its own (at least in my imagination), but
it would also provide a basis for physically experimenting with higher
level abstractions that can be compiled automatically to either
paradigm and over time the higher level abstraction(s) applicable to a
wide range of applications might naturally emerge.

nathanmarz

unread,
Nov 7, 2011, 12:15:11 AM11/7/11
to storm-user
I do think that the Cascading model of pipes and filters is the right
start for a higher level abstraction. However, as you noted many of
its concepts don't map cleanly to realtime processing, such as joins
and aggregations. This is an indication that while we can take
inspiration from Cascading for a higher level abstraction on top of
Storm, it would be a mistake to try to implement a planner for
Cascading on top of Storm. Realtime computation is fundamentally
different than batch computation, so it would be better to start fresh
and not restrict yourself.

There's a lot of things a higher level abstraction needs to
incorporate that Cascading has no conception of:

1. Interaction with external databases
2. Automatic batching of requests to external databases
3. Checkpointing
4. Different kinds of streaming joins (single joins, windowed joins)
5. Different kinds of streaming aggregations (windowed aggregations,
total aggregations using external state)

-Nathan


On Nov 3, 5:27 pm, Nathan Stults <plasticliz...@gmail.com> wrote:
> tldr : Maybe a good place to start would be Cascading for Storm?
>
> This question was asked the other day on the Cascading group as well:http://groups.google.com/group/cascading-user/browse_thread/thread/8f...

nathanmarz

unread,
Nov 7, 2011, 12:20:48 AM11/7/11
to storm-user
I wrote a blog post about the batch/realtime approach that explains
how I think about data processing in general:
http://nathanmarz.com/blog/how-to-beat-the-cap-theorem.html

Here's what I see as the "roadmap" for the "ultimate" abstraction over
batch/realtime:

1. General-purpose batch-writable, random-read databases for any kind
of data model (search, document db, key -> large set, etc.).
ElephantDB taken to the next level.
2. Higher level abstraction over Storm
3. Then, the higher level abstraction over batch and realtime

I really only have a fuzzy notion of what that ultimate abstraction
will look like. But we have a lot of work until it will be feasible to
build it. I see building higher level abstractions on top of Storm the
best place to start attacking that problem, and I wrote my thoughts on
that on another post in this thread.

-Nathan

Nathan Stults

unread,
Nov 8, 2011, 1:26:33 AM11/8/11
to storm-user
#3 obviously follows #2, but is #2+ dependent on #1?

What in your opinion would be a good medium for experimenting with #2?
A storm-contrib project or equivalent, individual personal projects,
or was this a roadmap for projects you were envisioning you would
build as official pieces of the storm ecosystem in the future?

Nathan Marz

unread,
Nov 9, 2011, 1:32:48 AM11/9/11
to storm...@googlegroups.com
#1 only affects the batch computation side of things, which is completely independent of Storm. So #3 is dependent on #1 and #2, but #2 can be done on its own.

The main thing that needs to be done for #2 is experimentation. I want to encourage as many people as possible to build DSLs for Storm, and then we can learn what works, what doesn't, and what's missing. And I think there's room for many kinds of DSL's -- distributed RPC DSL's, DSL's for different languages (python, ruby, etc.), SQL-like DSL, etc.

I'll probably end up creating a Storm contrib ( http://groups.google.com/group/storm-user/browse_thread/thread/4a9a6ef3b53733fe ), but whether outside contributors want to host their project there is up to them.

-Nathan

--
Twitter: @nathanmarz
http://nathanmarz.com

Reply all
Reply to author
Forward
0 new messages