real-time/offline pattern

221 views
Skip to first unread message

Greg Fodor

unread,
Sep 22, 2011, 3:54:43 AM9/22/11
to storm-user
Hey Nathan, you've posted a few times about the pattern you use for
real time systems, where a real time system (presumably Storm)
provides intra-day, non-idempotent analysis of streams of data which
has its errors cleaned up periodically by an offline batch system like
Hadoop. I was wondering if you can talk more about this pattern in
detail now that Storm is released.

It seems that the major piece of plumbing for this type of thing is a
way to manage the switchover from attributing a specific data point
from the real-time to offline subsystems, particularly in carefully
avoiding double counting. The canonical example, I would guess, is
some roll-up counter for word counts seen in some stream. Here's how I
imagined it works.

The nightly routine is run on Hadoop every couple hours to get the
full word count. Now, you want to snap in this new output while still
allowing Storm to march forward and continue to report accurate counts
for the time window past the period covered by this new batch output.

The way I'd imagine you'd do this is within your Storm bolts you'd
have an array of caches (backed persistently somehow) each
corresponding to some say 5-minute window of counting seen by Storm.
The trick here is the caches need to be combinable under operations
that are commutative and associative, similar to Hadoop combiners,
like addition. For example, a simple cache for a word counter would be
the count of each word seen within each interval.

As tuples with word occurrences are processed in the stream, the count
for the word in the current interval is incremented, the earlier
cached counts are combined to report forward the aggregate count for
that word based upon the slice of time which occurred after the last
known drop of the batch data, and those aggregations are combined with
the batch data downstream somehow to get the full count to the user.
So, if the last batch had things covered up until noon, the Bolt
generating the counts reports counts as summed by all the cached
counts for all time windows between noon and the current time. These
counts reflect the word counts from noon-now, and the batch data can
be accessed downstream to add in the sum before noon.

When a new batch is loaded, the Storm cluster is somehow made aware of
the last 5-minute time slice now fully computed offline and available
for aggregation. For example, it may now be instead of noon, 3PM. This
bit of state probably should be in zookeeper, so that the downstream
processes merging the Storm-generated count stream with the batch
output knows as well atomically, if that process lies outside of the
topology. This state change causes the Bolts to begin merging cached
counts that corresponds only to aggregations after that point in time,
and the old caches from time before that can be dropped out of RAM or
disk. This change makes it clear why such splitting of caches into
time windows is necessary: otherwise there would be no way to discard
the aggregations made that eventually overlap with data loaded from
the batch system.

Is this on the right track? Any tricky cases I'm missing here?

nathanmarz

unread,
Sep 22, 2011, 1:56:43 PM9/22/11
to storm-user
You have the right idea. Let me describe how we do it, which I think
is simpler than what you're thinking.

Consider the simple case of counting. In that case, the realtime layer
can essentially ignore the batch layer and update databases ad
infinitum. For example, our realtime layer does batch atomic
increments into Cassandra for clicks over time.

The batch layer performs the same computations (perhaps with
additional analysis only possible in batch), and emits it's results
into ElephantDB. It also emits a "data up to date since this time"
value that is used by the *application* layer, not the realtime layer.

Our application, when resolving a query, knows what the cutoff time is
for which it should get results from EDB and for which it should get
results from Cassandra (because the batch layer told it). So it splits
up the query appropriately among those two databases.

Now, the next step is to flush data you no longer need from Cassandra.
You can do this by running two Cassandra clusters and rotating between
them as the batch layer performs its updates. A rotate will clear and
reset one of the clusters.

Time tends to be a recurrent way to merge the results of both the
batch and realtime layers, which I guess isn't surprising.

You can extend this pattern to involve more coordination between batch
and realtime layers for more complex kinds of queries.

The reason why this architecture with seemingly many moving parts
works is because everything in the realtime layer is transient. The
batch workflow, the authoritative source of data in this system, is
actually rather simple.

-Nathan

Greg Fodor

unread,
Sep 22, 2011, 7:39:59 PM9/22/11
to storm-user
This is starting to make sense, a few more questions though :)

If I am understanding you right, when you say you 'reset' the
cassandra node, you're effectively resetting the counters? What I
think I am missing is how you make up for the chunk of time between
when the batch processing system started and finished, while the
realtime system was still running. The batch output is missing that
time, and if you reset the cassandra nodes after the batch data
becomes available, you've lost that data emitted from the realtime
system while the batch system was running.

The other bit I am missing, if I'm wrong about the rotation being a
hard reset, is how do you query the cassandra cluster for counts since
the cutoff time, if you've been incrementing? That was kind of what I
was getting at with my time interval caches, basically by just
incrementing a hard counter you lose the ability to say "what was the
count since this cutoff time." I'm definitely missing something
obvious here. (I've never used Cassandra)

nathanmarz

unread,
Sep 22, 2011, 8:39:26 PM9/22/11
to storm-user
By reset, I mean clear all the data for the nodes (with an rm -rf) and
restart the servers.

In order to have high availability of realtime data, that's why we run
2 cassandra clusters. We write to both, read from the longest running
one, and reset the long-running cluster only when the newer cluster
has enough data to cover the data missing from the batch layer.

I wish there were databases that had this kind of expiration built-in
(i.e., a true reset and not just using tombstones/compaction), but
this technique works well enough.

-Nathan

Greg Fodor

unread,
Sep 23, 2011, 2:38:35 PM9/23/11
to storm-user
So are you then just recording each tuple into cassandra and rolling
up the counts at query time? I'm still not seeing how you are able to
take an arbitrary time slice and ask Cassandra for the count seen
during that time slice without recording the counts as a histogram
over time. My assumption is you are writing *counts* to cassandra, so
once a new batch is available, how do you know which part of that
count was added before or after the time at which the new batch was
computed?

Sorry if I am being a bit dense here. I think this design pattern is
pretty powerful and I really want to make sure I understand it.

nathanmarz

unread,
Sep 23, 2011, 6:35:48 PM9/23/11
to storm-user
Our application rolls up click stats per hour, so we store in
Cassandra a key of {url, hour} to a value of the number of clicks. In
ElephantDB we store the same thing, but also roll up for 24 hour
periods as well. The 24 hour rollup is so we can do faster queries on
larger time ranges. Since everything is split up by time already, it's
easy to figure out what you need to query from ElephantDB and what you
need to query from Cassandra.

If you were doing global counts that aren't split up by time, then I
recommend storing in batch the count up until the hour for which you
have complete data, and then in the realtime database store counts by
hour. You can then construct the global count by doing a get on EDB, a
multiget on Cassandra, and combine them to get the answer.

Again, this seems like a lot of moving parts, but you get things like
auto-correction when something goes wrong in the realtime layer, a
safety net against mistakes, not having to worry about the CAP theorem
that much, etc. And since the batch layer is so simple (relatively),
this architecture is very robust.

-Nathan

Greg Fodor

unread,
Sep 25, 2011, 1:55:27 AM9/25/11
to storm...@googlegroups.com
This clears it up. Thanks for the clear explanation!
Reply all
Reply to author
Forward
0 new messages