Dynamic Topology Changes

1,669 views
Skip to first unread message

Trevor Smith

unread,
Nov 17, 2011, 3:00:36 PM11/17/11
to storm...@googlegroups.com
Hello,

We're looking into using Storm at Knewton. Currently I am putting
together proof of concepts using Storm. So far I am very impressed.
Great work.

One of our needs is the ability to dynamically edit a topology from a
given bolt. A trivial example use case is: bolt A counts the number of
tuples it has received and emits the tuple count to bolt B. Once bolt
A receives the 1,000th tuple it stops emitting to bolt B and instead
emits all tuples to bolt C. It appears this can currently be handled
by predefining the topology, but what are thoughts on extending Nimbus
to allow requests such as "addFieldGrouping"? Our current needs only
have to do with adding/deleting connections between spouts/bolts and
not adding any new application code to the cluster. Is this something
that coincides with the general direction that Storm is heading? We
would be interested in contributing to Storm and adding these
features, but wanted to hear thoughts by the authors.

Thank you.

Trevor

Trevor Summers Smith

unread,
Nov 18, 2011, 8:50:02 AM11/18/11
to storm-user

Nathan Marz

unread,
Nov 19, 2011, 2:16:47 PM11/19/11
to storm...@googlegroups.com
Changing running topologies on the fly is currently not a goal of Storm. This is actually a really complex thing to do. 

Currently, to update a topology you need to kill the current topology and start a brand new one. The problem with this is that it can take a few minutes. There's a future feature for Storm called "swapping", in which Storm will be able to start the updated form of the topology, and atomically swap execution between the old topology and the new topology with minimal downtime on processing. This may or may not satisfy what you're asking for.

I'd like to learn more about the specifics of your problem that's leading you to the conclusion that you need to dynamically modify the topology at runtime. The better I understand the problems you're trying to solve, the better I'll be able to suggest alternative approaches or be convinced that Storm itself needs changes :)

-Nathan



--
Twitter: @nathanmarz
http://nathanmarz.com

Trevor Smith

unread,
Nov 20, 2011, 1:22:01 PM11/20/11
to storm...@googlegroups.com
Nathan,

Thanks for your reply. The problems:

1) I want to have a given bolt decide in its emit method what bolt or
bolts to emit to from a given "group" of bolts. Ie not a broadcast.
Assume that all possible consuming bolts will be taking a tuple of the
same type.

2) Ideally I would like to be able to dynamically spin up extra
consuming bolts with low cost. At the moment this seems it would
require a topology restart. The "swapping" feature you described would
probably solve this. I am interested in hearing more about the
swapping feature.

Disclaimer: I have only spent a few hours looking into the source of
Storm. I can see solving problem one by:

1) Adding a field to the tuple being emitted by the bolt. The decision
to interact or not interact with the tuple is then up to consuming
bolts. Cons: lots of unnecessary traffic.

2) Add one output stream for each consumer. Cons: one stream for each
bolt seems a bit excessive (however I am not that familiar with Storm
so perhaps that is fine). Topology definition becomes more complex.

3) Use "directGrouping". Have the emitter bolt keep a mapping between
component and task ids. Choose to send to a given bolt by sending out
to all task ids for the given component. This seems like the best
solution.

Thoughts? Thanks.

Trevor

Nathan Marz

unread,
Nov 21, 2011, 2:22:29 AM11/21/11
to storm...@googlegroups.com
Hey Trevor,

Streams are pretty cheap so you should feel free to have many output streams from a single spout or bolt. 

By understanding the problem you're trying to solve, I don't mean specific implementation things you want to do. What I mean is what's the data you're dealing with, and what kind of realtime computations are you doing with that data. For example, you may want to take Twitter data and compute trending topics on it. Starting from the problem, we can work through the computational primitives you need to solve that problem. It's possible that we'll find that you're approaching the problem with Storm in the completely wrong way, or it's possible that we'll find that Storm needs more features to solve your particular problem. Though my gut tells me that you don't actually need dynamic topologies to accomplish your task. 

The beauty of Storm is that it's such a simple set of primitives, and I intend to be very careful to keep it simple and avoid unnecessary feature creep. To accomplish this, I need to understand the fundamental data challenges you're trying to solve. 

-Nathan

Trevor Smith

unread,
Nov 29, 2011, 9:37:25 AM11/29/11
to storm...@googlegroups.com
Nathan,

Understood re: feature creep.

The problem is: we need to train a large number of different models that all consume tuple type A and output tuple type B. Each model will be implemented with a variable number of bolts, but the consuming type of the first bolt, and the output type of the last bolt will be the same for each model. Each model may be arbitrarily complex.

Let's take your trending topics example: each "model" will take in a tweet. Each model will output a top ten list of trending topics. A given model will have various sub-models that it will need to use in its predications. Continuing the Twitter example, one sub-model type is the calculation of "reach". Models will share sub-models (eg both model A and model B can use sub-model C for their calculation of reach). For a given tweet-topic a model will want to dynamically choose what sub-model it uses for a given sub-model type. So, Model A might use sub-model 1 for its calculation of reach for "Kanye West", and sub-model 2 for "Knewton". Model B might use some of these same sub-models, or it might not even have a notion of "reach" in its calculation.

A given model will have the application code necessary to define it, as well as certain parameters necessary to instantiate it. There will be a handful of different application codes, but 1,000s of different instantiated models.

Does the need for bolts to participate in multiple flows break the intended use-case for Storm?  Or is there a workaround that you've run into for cases like this, which might typically be implemented as a federation of custom pub-sub interfaces. 

Does that make sense? Thanks.

Trevor

Nathan Marz

unread,
Dec 1, 2011, 7:16:05 PM12/1/11
to storm...@googlegroups.com
I don't understand your question completely, honestly. It's rather complex and the devil is undoubtably in the details. But I'll answer as best I can.

So it sounds like you have a few different "kinds" of models, but you instantiate many of each type with different parameters? Can you just have a series of bolts for each kind of model, and have each of those sub-topologies handle every possible type of that model? (so just share the computation for models of the same type). 

I don't see any conflict with the sub-model stuff you were talking about. You can either implement the sub-model as a library and use it within each bolt that needs it, or you can model it as a sub-topology and join against the output stream whenever you want to use it from another model.

Hope that helps,
Nathan

Apostolis Xekoukoulotakis

unread,
Feb 2, 2012, 4:02:29 AM2/2/12
to storm...@googlegroups.com
I am opening this old thread to give an example in which I want dynamic topologies. I am making a graph processing tool. I need to split the graph and assign the parts to multiple threads/bolts. Then I would like to have a location-resolution bolt that tells each thread where a part of the graph lies. 

The only way It can be done now is if you create streams between all possible bolts since it is impossible to know which thread will have which part of the graph.

Ben Hughes

unread,
Feb 2, 2012, 1:32:29 PM2/2/12
to storm-user
I don't think this is a case of needing to dynamically reconfigure the
topology. If the graph is or could be spread out among one type of
bolt, then you could use a field grouping or some sort of custom
stream grouping to handle the partitioning. To actually walk the
graph in a potentially quite inefficient way, you could have a stream
off the graph bolts that circles back to them until an exit condition
is met and a tuple can be emitted on a separate stream.

Ben

On Feb 2, 4:02 am, Apostolis Xekoukoulotakis <xekou...@gmail.com>
wrote:

Apostolis Xekoukoulotakis

unread,
Feb 2, 2012, 8:46:59 PM2/2/12
to storm...@googlegroups.com
What you describe is a good way to do it. But your way requires that the whole graph is in memory and that there is no way to dynamically optimize your partitioning. I am thinking of having only the parts of the graph that are used, in memory but maybe this is not possible if the graph is highly interconnected.
 

2012/2/2 Ben Hughes <schl...@gmail.com>



--

Sincerely yours, 
     Apostolis Xekoukoulotakis

Ted Dunning

unread,
Feb 2, 2012, 8:49:49 PM2/2/12
to storm...@googlegroups.com
What you really want for this is BSP as with Giraph.  Storm doesn't do that very well.

It is plausible that you could make storm do it well but it won't be as clear as having Storm do what it does well or having Giraph do what it does well.

Nathan Marz

unread,
Feb 3, 2012, 9:00:43 PM2/3/12
to storm...@googlegroups.com
The state spout (planned feature) will be applicable to this kind of processing. The state spout implementation has gotten delayed due to higher priority things (like transactional topologies), but it is in the pipeline. You can learn more about the state spout towards the end of this presentation: http://www.infoq.com/presentations/Storm

Apostolis Xekoukoulotakis

unread,
Feb 3, 2012, 9:41:46 PM2/3/12
to storm...@googlegroups.com
The state spout could do the trick indeed.

2012/2/4 Nathan Marz <natha...@gmail.com>
Reply all
Reply to author
Forward
0 new messages