How CoordinatedBolt Works

» 03 Jan 2012

In which I don’t mention Clojure at all

Storm comes with a neat implementation of a common DRPC pattern, Linear DRPC. This pattern handles the common case where the computation is a linear set of steps. The ReachTopology in storm-starter is an example of a very parallel Linear DRPC topology. The cool thing about this is that at any stage for any request that comes through, you can emit as many tuples pertaining to that request as you want and even specify operations that should occur only once a step has seen every tuple for the request that it will ever get. The coordination that allows for this magic is completely invisible to the user and is handled through CoordinatedBolt.

A question about how CoordinatedBolt works came up on the mailing list, so I decided to look at the source code to figure out how it operates. As part of the process, I annotated some source code for my own edification. Reading code is good, so check out the annotated code

The first thing to understand is that LinearDRPCTopologyBuilder significantly changes your topology. This is what the Reach Topology actually looks like (click for fullsize):

Uploaded with Skitch!

You can see the structure of the ReachTopology encased in the framework of the Linear DRPC topology. The bolts that implement the computation are all wrapped by CoordinatedBolts. Direct streams have been added between all of the CoordinatedBolts. The final step in the ReachTopology gets an additional input stream from prepare-request that is grouped on the request id and is simply a stream of the ids of all the requests that have come in. There is also the scaffolding for the information necessary to return the result to the proper DRPC client that is handled by JoinResult.

CoordinatedBolts add a layer of tracking on top of other bolts. It delegates to the underlying bolt for everything that isn’t part of the book keeping or implementation of CoordinatedBolt itself. Internally, each task contains data for every request it has seen on the number of tuples received from the previous bolt (tracked by the OutputCollector when user code acks or fails a tuple, a total across all tasks of the previous bolt), the number of tuples that each previous task has sent to this task, and the number of previous tasks that have told this task how many tuples they sent. The reports from previous tasks are received over the direct stream, and are sent downstream only once the task is considered “finished”. In this way, the “finished” status asynchronously cascades down the topology.

For a task to be considered “finished” for a request (and it is only ever on a per request basis), it depends on a few different factors (in the code, this is the checkFinishId method). A task in the first bolt is complete once the single request tuple from prepare-request is acked or failed. A middle task is complete once all the tasks for the previous step have reported the number of tuples they sent to this exact task (or 0 if they sent none, still have to report it) and the number of tuples (not counting the coordinated bolts book keeping tuples) this task has received (e.g. acked/failed) matches the number of tuples the previous step has told it to expect. A task in the final bolt is complete when the conditions for the middle task are met AND it has received the id tuple from prepare-request. All of these are separated by the request id in field 0 of all the tuples.

Once a task is finished, if the underlying bolt implements FinishedCallback, the finishedId callback is called with the request id. After that, the task iterates through all the tasks in the next step, sending each one the number of tuples it sent to that task for the request over the direct stream. The order is important because the finishedId could (and usually would) emit more tuples, affecting the final count.

A task checks whether it is finished every time it receives a book keeping tuple and every time a tuple is acked or failed from the user provided bolt.

Once the topology completes the request, JoinResult puts the result together with the DRPC return info. ReturnResult handles the actual sending of the result back to the DRPC client that made the call.

The really cool part of all of this, is that it is entirely built on top of normal Storm primitives. As Nathan said on the mailing list:

Just want to point out the underlying primitives that are used by CoordinatedBolt: 1) When you call the "emit" method on OutputCollector, it returns a list of the task ids the tuple was sent to. This is how CoordinatedBolt keeps track of how many tuples were sent where. 2) CoordinatedBolt sends the tuple counts to the receiving counts using a direct stream. Tuples are sent to direct streams using the "emitDirect" method whose first argument is the task id to send the tuple to. 3) CoordinatedBolt gets the task ids of consuming bolts by querying the TopologyContext.