How CoordinatedBolt Works » 03 Jan 2012
In which I don’t mention Clojure at all
Storm comes with a neat implementation of a common DRPC pattern, Linear DRPC. This pattern handles the common case where the computation is a linear set of steps. The ReachTopology in storm-starter is an example of a very parallel Linear DRPC topology. The cool thing about this is that at any stage for any request that comes through, you can emit as many tuples pertaining to that request as you want and even specify operations that should occur only once a step has seen every tuple for the request that it will ever get. The coordination that allows for this magic is completely invisible to the user and is handled through CoordinatedBolt.
A question about how CoordinatedBolt works came up on the mailing list, so I decided to look at the source code to figure out how it operates. As part of the process, I annotated some source code for my own edification. Reading code is good, so check out the annotated code
The first thing to understand is that LinearDRPCTopologyBuilder significantly changes your topology. This is what the Reach Topology actually looks like (click for fullsize):
Uploaded with Skitch!
You can see the structure of the ReachTopology encased in the framework of the Linear DRPC topology. The bolts that implement the computation are all wrapped by CoordinatedBolts. Direct streams have been added between all of the CoordinatedBolts. The final step in the ReachTopology gets an additional input stream from prepare-request that is grouped on the request id and is simply a stream of the ids of all the requests that have come in. There is also the scaffolding for the information necessary to return the result to the proper DRPC client that is handled by JoinResult.
CoordinatedBolts add a layer of tracking on top of other bolts. It delegates to the underlying bolt for everything that isn’t part of the book keeping or implementation of CoordinatedBolt itself. Internally, each task contains data for every request it has seen on the number of tuples received from the previous bolt (tracked by the OutputCollector when user code acks or fails a tuple, a total across all tasks of the previous bolt), the number of tuples that each previous task has sent to this task, and the number of previous tasks that have told this task how many tuples they sent. The reports from previous tasks are received over the direct stream, and are sent downstream only once the task is considered “finished”. In this way, the “finished” status asynchronously cascades down the topology.
For a task to be considered “finished” for a request (and it is only ever on a per request basis), it depends on a few different factors (in the code, this is the checkFinishId method). A task in the first bolt is complete once the single request tuple from prepare-request is acked or failed. A middle task is complete once all the tasks for the previous step have reported the number of tuples they sent to this exact task (or 0 if they sent none, still have to report it) and the number of tuples (not counting the coordinated bolts book keeping tuples) this task has received (e.g. acked/failed) matches the number of tuples the previous step has told it to expect. A task in the final bolt is complete when the conditions for the middle task are met AND it has received the id tuple from prepare-request. All of these are separated by the request id in field 0 of all the tuples.
Once a task is finished, if the underlying bolt implements FinishedCallback, the finishedId callback is called with the request id. After that, the task iterates through all the tasks in the next step, sending each one the number of tuples it sent to that task for the request over the direct stream. The order is important because the finishedId could (and usually would) emit more tuples, affecting the final count.
A task checks whether it is finished every time it receives a book keeping tuple and every time a tuple is acked or failed from the user provided bolt.
Once the topology completes the request, JoinResult puts the result together with the DRPC return info. ReturnResult handles the actual sending of the result back to the DRPC client that made the call.
The really cool part of all of this, is that it is entirely built on top of normal Storm primitives. As Nathan said on the mailing list:
Just want to point out the underlying primitives that are used by CoordinatedBolt: 1) When you call the "emit" method on OutputCollector, it returns a list of the task ids the tuple was sent to. This is how CoordinatedBolt keeps track of how many tuples were sent where. 2) CoordinatedBolt sends the tuple counts to the receiving counts using a direct stream. Tuples are sent to direct streams using the "emitDirect" method whose first argument is the task id to send the tuple to. 3) CoordinatedBolt gets the task ids of consuming bolts by querying the TopologyContext.
Testing Storm Topologies Part 2 » 21 Dec 2011
Previously, I wrote about testing Storm topologies using the built-in Clojure testing utilities. You should read Part 1 to understand what Storm gives you by default. This should be enough to test many topologies that you may want to build. This post digs in to more advanced testing scenarios, using the RollingTopWords topology from storm-starter as an example. I’ve forked that project to write tests for the provided examples.
But first, a brief digression.
Why using Clojure to test your Java topologies is not so bad
Currently, the testing facilities in Storm are only exposed in Clojure, though this seems likely to change in 0.6.2. Even if you write nearly everything in Java, I think Clojure offers a lot of value as the testing environment. You’ve already paid the price for the Clojure runtime through the use of Storm, so you might as well get your money’s worth out of it. Clojure macros and persistent data structures turn out to be really helpful when writing tests. In normal usage, mutable data structures shared between threads can often be a good fit if you are careful with thread safety and locks. Tests benefit from different constraints, though. Especially when testing a system like Storm, you might want to take state at a given time, perform some operation, and then ensure that the state changed thusly. While this can be accomplished using careful bookkeeping and setup, it’s almost pathetically easy to do when you can compare the old state with the new state at the same time. Clojure is also significantly terser than Java, so you can experiment with new tests with less typing.
Learning Clojure isn’t exceptionally difficult, especially if you have had some exposure to functional programming (Ruby counts). I read a book on it a month ago and have an acceptable grasp on it. The amount that you need to know to write tests in it is pretty small. You can mostly just use Java in it like so:
In any case, I personally like using Clojure to test topologies, no matter what language they were originally written in.
Dances with RollingTopWords
RollingTopWords is a pretty cool example that takes in a stream of words and returns the top three words in the last ten minutes, continuously. You have a counter bolt (“count” in the topology) that uses a circular buffer of buckets of word counts. In the default configuration, there are 60 buckets for 10 minutes of data, so the current bucket gets swapped out every 10 seconds. When a word comes in, that word’s count in the current bucket is incremented, and the bolt emits the total count of that word in all buckets. A worker thread runs in the background to handle the clearing and swapping of buckets. The word and its count are then consumed by the “rank” bolt, which updates its internal top 3 words and then, if it hasn’t sent out an update in the last 2 seconds, emits its current top 3 words. This is consumed by one “merge” bolt that takes the partial rankings from each “rank” task and finds the global top 3 words. If it hasn’t sent out an update in the last 2 seconds, it emits the rankings.
This topology’s behavior depends extensively on time, which makes it harder to test than topologies that are simply a pure function of their input. In writing the test for RollingTopWords . I first had to make a few changes to the source code to allow time simulation. Storm comes with the utilities backtype.storm.utils.Time and backtype.storm.utils.Utils that allow for time simulation. Any place where you would normally use System.getCurrentTimeMillis(), use Time.getCurrentTimeMillis(), and where you would use Thread.sleep(ms), use Utils.sleep(ms). When you are not simulating time, these methods fall back on the normal ones. The other thing that the timing element does is make complete-topology kind of useless for getting any sort of interesting results. I use a capturing-topology from my own storm-test library. It is basically an incremental, incomplete complete-topology.
Testing is now a matter of ensuring two things:
- Word counts are tabulated for a time period and then rotated.
- Ranks are actually calculated and emitted correctly.
The first is especially time sensitive since a bucket is current for all of 10 (simulated) seconds. The capturing-topology helpers wait-for-capture and feed-spout-and-wait! both depend on simulate-wait which takes at minimum 10 simulated seconds (and up to TIMEOUT seconds, in increments of 10). advance-cluster-time from backtype.storm.testing also requires care as by default it only advances the simulated time one second at a time (which is slow in real time). If you jack up the increment amount past (by default) 30, which seems reasonable if you’re trying to go forward 10 minutes into the future, your cluster will start restarting itself because of a lack of heartbeat. In this example, any value greater than 10 will confuse the worker thread handling the cleanup, creating weird results. Time is stopped while simulating, so, while still complicated, you can still be fairly precise in your control.
To test the first, the boilerplate looks like:
At this point, the time is now 10s.
It’s time to test the single bucket functionality by feeding in a bunch of words and making sure the count is as we expect.
The time is now 20s because of the wait after the four tuples are fed in.
We should advance time so we can test the multiple in play bucket case
Time is now 70s, advanced in increments of 9 to let the worker thread do its business and avoid nasty timeouts.
Time is now 80s. Let’s advance the cluster so the first bucket is now a long lost memory, but the second bucket we wrote to is still in play. To check that, we pump another word in and check the counts coming out.
And that’s that. Over 10 minutes of fake time simulated in under 10 seconds of real time. The only thing left in this test is to close it out in true Lisp fashion:
The test for the rankings that come out of the system is similar, but much simpler because as long as there is at least 2 seconds between each ranking producing tuple and less than 10 minutes of total simulated test time, things pretty much just work. The feed-spout-and-wait! calls give at least 10 seconds of spacing which works out perfectly. The details of that test can be seen in test/storm/starter/test/jvm/RollingTopWords.clj
I released storm-test version 0.1.0 today. It’s installable using the standard lein/clojars magic as [storm-test “0.1.0”]. In addition to the capturing-topology that this blog post demonstrated, it also has the quiet logs functionality and a visualizer for topologies that could be helpful on certain hairier setups.
I should probably plug my company, NabeWise, as it is the reason I get to get my hands dirty with all of this data processing. We’re doing really exciting things with Clojure, Node.js, Ruby, and geographic data.
Testing Storm Topologies (in Clojure) » 17 Dec 2011"Storm":https://github.com/nathanmarz/storm is a very exciting framework for real-time data processing. It comes with all sorts of features that are useful for incremental map reduce, distributed RPC, streaming joins, and all manner of other neat tricks. If you are not already familiar with Storm, it is well documented on the "Storm wiki":https://github.com/nathanmarz/storm/wiki . At "NabeWise":http://nabewise.com, we are in the process of creating and rolling out a new system that builds on top of Storm. Storm's Clojure DSL is really very good and allows us to write normal Clojure code that we can then tie up into topologies. This system will enable a large chunk of our feature set and will touch much of our data. Testing that the functionality works as expected is extremely important to us. By using Clojure, we can test much of our system without thinking about storm at all. This was critical while we were writing core code before even having decided on using storm. The functions that end up running our bolts are tested in the usual ways without dependency or knowledge of their place in a topology. We still want to be able to test the behavior of our entire topology or some part of it to ensure that things still work as expected across the entire system. This testing will eventually include test.generative style specs and tests designed to simulate failures. Luckily, Storm ships with a ton of testing features that are available through Clojure (and currently only through Clojure, though this is liable to change). You can find these goodies in "src/clj/backtype/storm/testing.clj":https://github.com/nathanmarz/storm/blob/master/src/clj/backtype/storm/testing.clj. These tools are pretty well exercised in "test/clj/backtype/storm/integration_test.clj":https://github.com/nathanmarz/storm/blob/master/test/clj/backtype/storm/integration_test.clj . We will look into the most important ones here. h4. with-local-cluster This macro starts up a local cluster and keeps it around for the duration of execution of the expressions it contains. You use it like:
Snippet: List the Gems Your App Needs » 06 Aug 2009When you aren't careful, it is easy to slip gems into your app without properly accounting for them. Often times it is simpler to just hope on system gems than mess with @config.gem@. This makes deployment more difficult and can make bringing a new development environment online take significant time and energy. To fix this later, you need some idea of the gems on which your app depends. Put this snippet into the Rakefile below the boot line and run @rake test | grep GEM@
Expiring Rails File Cache by Time » 13 Jul 2009One of the major weaknesses of the Rails cache file store is that cached pages cannot be expired by time. This poses a problem for keeping caching as simple as possible. The solution I came up with stores cached content as a JSON file containing the content and the ttl. Expiration needs only be set when @cache@ is called. @read_fragment@ should know nothing about expiration.
Common Rails Beginner Issues » 27 Jun 2009I've recently become somewhat addicted to "Stack Overflow":http://stackoverflow.com, and I have noticed some areas of confusion with using Ruby on Rails. Convention over configuration and awesome magick are pretty foreign concepts in most of CS, so the confusion is quite understandable. The Rails community also seems to have a love for demonstrating simple applications being created simply (Blog in 5 minutes, extreme!) and complex corner cases solved through advance techniques. There isn't much in the way of middle ground. If you are planning on doing a lot of Rails and are okay with buying dead tree books, you should go buy The Rails Way by Obie Fernandez at your soonest convenience. It is worth its (considerable) weight in gold The following are what I've observed to be among the hardest issues for people moving from idealized Rails applications to the realities of actual websites. I will try to mostly avoid the issues of separation between the levels of MVC and any of the philosophical opinions of DRY. h3. Dependencies: How to make Rails find your classes. Starting out, Rails is amazing at automagically including everything you need to make the code run without any need for @require@ or @load@ lines. Things get a little more confusing when you write your own code and find that it is not being included where you expect. The key here is that Rails uses *naming conventions* to handle code loading. The naming convention is that class names are in CamelCase and the file containing them are named using underscores.
Using Fluid For Convenient Rails Diagnostics » 02 May 2009I recently got a Macbook Pro and have been quite impressed with it. I have also been doing a ton of work for "BigThink":http://bigthink.com. I'm a tabbed browsing nut, and I discovered that if I ever wanted to get anything done I had to limit my tabs to the point where they all still fit in the bar. This forces me to actually read or act upon the things I have thrown into tabs rather than just letting them simmer. Working on a large Rails project means that I often find myself needing information from trendy websites like Lighthouse, Hoptoad, and New Relic. That's three more tabs towards my limit. Every time I closed those tabs to make more room for normal web browsing, something happened that caused me to have to check them again. Then I discovered that Fluid supports tabs and saves the tab session per application. !http://img.skitch.com/20090502-bnw3ih2n5n2m42uqdhrtg7uau3.jpg! The trick is to set up the SSB to one site (in my case New Relic) and then on first run open up tabs and go to the other diagnostic services (Hoptoad and Lighthouse for me). The end result is a full diagnostic panel that pops up whenever you click the icon. !http://img.skitch.com/20090502-beixct3qumwu1k9bk9j3u4x1ek.jpg!
Integrating Ruby-Processing Into An Existing Project » 24 Jun 2008"Processing":http://processing.org and "Ruby-Processing":http://github.com/jashkenas/ruby-processing/wikis are really awesome programs for visualizing things and making pretty doodads. Ruby-Processing is great because it uses the JRuby Java bridge to expose all of Processing's immense power to normal Ruby code. Since I use JRuby at work for data processing anyway, Processing seemed like a natural fit for being braindead easy for drawing. When I went to hack it into my current project I was sorely disappointed to discover that Java was squawking about SecurityExceptions and signer mismatches when run from @jruby@. First, how is code-signing a first-class language function? Second, this appears to be a "known issue":http://jira.codehaus.org/browse/JRUBY-2439. I really hate Java. Out of frustration, I decided to hack at it and try to recreate the jar so it would be unsigned and happy. Doing this seemed to work as I can now use Processing wherever I want in my application. If you want to use Ruby-Processing in your JRuby app, download "core.jar":http://github.com/schleyfox/ascends_viz/tree/master%2Flib%2Fcore.jar?raw=true and "ruby-processing":http://github.com/schleyfox/ascends_viz/tree/master%2Flib%2Fruby-processing.rb?raw=true and place them wherever your other lib files live. I created this jar by unzipping the normal ruby-processing core.jar, removing all metadata, and rebuilding it like
jar cf core.jar processing/core/*.classThis seems to work and is generating a "color bar":http://github.com/schleyfox/ascends_viz/tree/master/lib/co2_color_code.rb#L69-94 for me now.