About CoordinatedBolt

1,030 views
Skip to first unread message

Jimmy Jim

unread,
Dec 29, 2011, 4:03:14 AM12/29/11
to storm-user
Hi,Nathan Marz
In the Storm-Wiki,I find this description in the chapter "Distributed
RPC".
"Under the hood, CoordinatedBolt is used to detect when a given bolt
has received all of the tuples for any given request id.
CoordinatedBolt makes use of direct streams to manage this
coordination."
What confused me is if the bolt has received all of the tuples,how can
guarantee the low latency? IMO,the tuples is a kind of stream,does all
the tuples means the stream is broken?
Forgive my pool English .
Thanks for your great job.It's cool.

Jimmy Jim

unread,
Dec 29, 2011, 8:20:49 AM12/29/11
to storm-user
I run the ReachTopology ,but still can't understand when the
"finishedId" method is called.
IMO,a typical case is stream A join with stream B on one field.If
stream A is faster than stream B,how can storm guarantee join
successfully.
So to other cases,1 join 1, 1 join n,n join m.
When the speed of two stream is very different,how the Storm handle ?
Forgive my pool English.Thank you

songhe yang

unread,
Dec 29, 2011, 11:04:49 AM12/29/11
to storm-user
Add a more question at here about "PartialUniquer also implements the
FinishedCallback interface, which tells the LinearDRPCTopologyBuilder
that it wants to be notified when it has received all of the tuples
directed towards it for any given request id. "

How does LinearDRPCTopologyBuilder know that bolt "PartialUniquer" has
received all of the tuples directed towards it ?

Ben Hughes

unread,
Dec 29, 2011, 7:43:31 PM12/29/11
to storm-user
I had also been wondering about the magic of CoordinatedBolt, so I sat
down and ran through the source code for it and
LinearDRPCTopologyBuilder. I added annotations to key lines and I am
working on cleaning those up and posting them soon. For now, I will
attempt to explain how it functions in general.

Summary (To Answer Your Questions):

1) How does it guarantee low latency? It efficiently tracks the
tuples generated for a specific DRPC request. DRPC requests are
linear and finite and in general operate as super parallel
implementations of normal methods that you could implement
sequentially using normal database reads/writes. This isn't the
infinite computation aspect of storm.

2) How do you work with joins? DRPC is for when you may be joining or
working with tuples that come from a finite source as determined by
the arguments to the DRPC function. The join will work as long as
there are a finite number of tuples generated and the slowest tuple
producer can still complete in less time than the timeout (which I
think is a minute). For true streaming joins, you don't want DRPC,
but just a regular storm topology that potentially emits tuples out
into a database or other stream that could be consumed via an external
client.

3) How does PartialUniquer know that it has received all tuples
directed towards it? CoordinatedBolt tracks the number of upstream
tasks that could possibly directly send it tuples, these upstream
tasks, when they have processed all the tuples directed towards them
for a given request, tell the task how many tuples they sent it, so it
is then just a matter of waiting until all tasks that can send tuples
to you have told you how many they have actually sent (which could be
0) and also waiting until the number of tuples you actually received
matches the number you were told to expect. Hooking these bolts
together causes completion status to cascade through the topology for
a request. The topology knows to call PartialUniquer's finishedId
because it knows when it has received all tuples it could receive and
it knows that PartialUniquer implements FinishedCallback.


Explanation of how it works:

First off, the reach topology created by LinearDRPCTopologyBuilder
actually looks like: https://skitch.com/benhughes/gwhfn/fs-4649424364673260365.dot.png
. The blue lines with the long stream name are direct streams used by
CoordinatedBolt to pass along coordination info. Each CoordinatedBolt
task has a map containing tracking info for each DRPC request_id that
it has seen. This tracking info includes the number of tuples
received from upstream tasks, the number of upstream tasks that have
reported in over the direct stream, the number of tuples upstream
tasks have told the task that it sent to them, and a map of downstream
task id to the count of tuples sent to that task.

A CoordinatedBolt can receive three types of tuples. The first is a
report from an upstream task on the number of tuples that it sent to
this task for a request identified by the id. This is added to update
the number of tuples that the bolt expects to receive for this
request. To be considered finished the CoordinatedBolt task must
receive this report from all immediately upstream tasks and then have
received that amount of regular tuples. The second is a special one
that all can receive but that only the very final step is subscribed
to. This is the stream of generated request ids from PrepareRequest.
For the final step to be considered complete, it must also have
received an id off of this stream. The third type is the regular
tuple that your code emits and consumes. These are immediately sent
to the delegate (the bolt that you specified in addBolt in the
builder). The delegation process also introduces a special
OutputCollector that updates the tracked count of received tuples when
a tuple is acked or failed and also maintains the map of how many
tuples were emitted to which tasks.

The CoordinatedBolt tasks all check whether they are complete after
every call to ack/fail from the delegate and after receiving any of
the book keeping tuples. Once a coordinated bolt task finds itself to
be complete, it reports how many tuples it emitted to each downstream
task. If the delegate bolt implements FinishedCallback,
finishedId(request_id) is called on it.

This is kind of the long and in depth version, though there some other
corner cases and implementation details.

Cheers,

Ben

Nathan Marz

unread,
Dec 29, 2011, 11:48:20 PM12/29/11
to storm...@googlegroups.com
Wow... great detective work! What are you using to visualize the topology?

Just want to point out the underlying primitives that are used by CoordinatedBolt:

1) When you call the "emit" method on OutputCollector, it returns a list of the task ids the tuple was sent to. This is how CoordinatedBolt keeps track of how many tuples were sent where.

2) CoordinatedBolt sends the tuple counts to the receiving counts using a direct stream. Tuples are sent to direct streams using the "emitDirect" method whose first argument is the task id to send the tuple to.

3) CoordinatedBolt gets the task ids of consuming bolts by querying the TopologyContext.



2011/12/29 Ben Hughes <schl...@gmail.com>



--
Twitter: @nathanmarz
http://nathanmarz.com

Ben Hughes

unread,
Dec 30, 2011, 11:22:34 AM12/30/11
to storm-user
For the visualization I'm using storm.test.visualization/visualize-
topology from https://github.com/schleyfox/storm-test on the example
reach topology in storm-starter.

Cheers,

Ben

On Dec 29, 11:48 pm, Nathan Marz <nathan.m...@gmail.com> wrote:
> Wow... great detective work! What are you using to visualize the topology?
>
> Just want to point out the underlying primitives that are used by
> CoordinatedBolt:
>
> 1) When you call the "emit" method on OutputCollector, it returns a list of
> the task ids the tuple was sent to. This is how CoordinatedBolt keeps track
> of how many tuples were sent where.
>
> 2) CoordinatedBolt sends the tuple counts to the receiving counts using a
> direct stream. Tuples are sent to direct streams using the "emitDirect"
> method whose first argument is the task id to send the tuple to.
>
> 3) CoordinatedBolt gets the task ids of consuming bolts by querying the
> TopologyContext.
>
> 2011/12/29 Ben Hughes <schley...@gmail.com>

Ben Hughes

unread,
Jan 3, 2012, 4:41:05 PM1/3/12
to storm-user
Ok, here's a post on CoordinatedBolt:
http://www.pixelmachine.org/storm/2012/01/03/How-CoordinatedBolt-Works.html

And here is the relevant annotated source code:
http://www.pixelmachine.org/code/StormCoordinatedBolt.html

Cheers,

Ben

On Dec 30 2011, 11:22 am, Ben Hughes <schley...@gmail.com> wrote:
> For the visualization I'm using storm.test.visualization/visualize-
> topology fromhttps://github.com/schleyfox/storm-teston the example
Reply all
Reply to author
Forward
0 new messages