I had also been wondering about the magic of CoordinatedBolt, so I sat
down and ran through the source code for it and
LinearDRPCTopologyBuilder. I added annotations to key lines and I am
working on cleaning those up and posting them soon. For now, I will
attempt to explain how it functions in general.
Summary (To Answer Your Questions):
1) How does it guarantee low latency? It efficiently tracks the
tuples generated for a specific DRPC request. DRPC requests are
linear and finite and in general operate as super parallel
implementations of normal methods that you could implement
sequentially using normal database reads/writes. This isn't the
infinite computation aspect of storm.
2) How do you work with joins? DRPC is for when you may be joining or
working with tuples that come from a finite source as determined by
the arguments to the DRPC function. The join will work as long as
there are a finite number of tuples generated and the slowest tuple
producer can still complete in less time than the timeout (which I
think is a minute). For true streaming joins, you don't want DRPC,
but just a regular storm topology that potentially emits tuples out
into a database or other stream that could be consumed via an external
client.
3) How does PartialUniquer know that it has received all tuples
directed towards it? CoordinatedBolt tracks the number of upstream
tasks that could possibly directly send it tuples, these upstream
tasks, when they have processed all the tuples directed towards them
for a given request, tell the task how many tuples they sent it, so it
is then just a matter of waiting until all tasks that can send tuples
to you have told you how many they have actually sent (which could be
0) and also waiting until the number of tuples you actually received
matches the number you were told to expect. Hooking these bolts
together causes completion status to cascade through the topology for
a request. The topology knows to call PartialUniquer's finishedId
because it knows when it has received all tuples it could receive and
it knows that PartialUniquer implements FinishedCallback.
Explanation of how it works:
First off, the reach topology created by LinearDRPCTopologyBuilder
actually looks like:
https://skitch.com/benhughes/gwhfn/fs-4649424364673260365.dot.png
. The blue lines with the long stream name are direct streams used by
CoordinatedBolt to pass along coordination info. Each CoordinatedBolt
task has a map containing tracking info for each DRPC request_id that
it has seen. This tracking info includes the number of tuples
received from upstream tasks, the number of upstream tasks that have
reported in over the direct stream, the number of tuples upstream
tasks have told the task that it sent to them, and a map of downstream
task id to the count of tuples sent to that task.
A CoordinatedBolt can receive three types of tuples. The first is a
report from an upstream task on the number of tuples that it sent to
this task for a request identified by the id. This is added to update
the number of tuples that the bolt expects to receive for this
request. To be considered finished the CoordinatedBolt task must
receive this report from all immediately upstream tasks and then have
received that amount of regular tuples. The second is a special one
that all can receive but that only the very final step is subscribed
to. This is the stream of generated request ids from PrepareRequest.
For the final step to be considered complete, it must also have
received an id off of this stream. The third type is the regular
tuple that your code emits and consumes. These are immediately sent
to the delegate (the bolt that you specified in addBolt in the
builder). The delegation process also introduces a special
OutputCollector that updates the tracked count of received tuples when
a tuple is acked or failed and also maintains the map of how many
tuples were emitted to which tasks.
The CoordinatedBolt tasks all check whether they are complete after
every call to ack/fail from the delegate and after receiving any of
the book keeping tuples. Once a coordinated bolt task finds itself to
be complete, it reports how many tuples it emitted to each downstream
task. If the delegate bolt implements FinishedCallback,
finishedId(request_id) is called on it.
This is kind of the long and in depth version, though there some other
corner cases and implementation details.
Cheers,
Ben