Testing Storm Topologies Part 2

» 21 Dec 2011

Previously, I wrote about testing Storm topologies using the built-in Clojure testing utilities. You should read Part 1 to understand what Storm gives you by default. This should be enough to test many topologies that you may want to build. This post digs in to more advanced testing scenarios, using the RollingTopWords topology from storm-starter as an example. I’ve forked that project to write tests for the provided examples.

But first, a brief digression.

Why using Clojure to test your Java topologies is not so bad

Currently, the testing facilities in Storm are only exposed in Clojure, though this seems likely to change in 0.6.2. Even if you write nearly everything in Java, I think Clojure offers a lot of value as the testing environment. You’ve already paid the price for the Clojure runtime through the use of Storm, so you might as well get your money’s worth out of it. Clojure macros and persistent data structures turn out to be really helpful when writing tests. In normal usage, mutable data structures shared between threads can often be a good fit if you are careful with thread safety and locks. Tests benefit from different constraints, though. Especially when testing a system like Storm, you might want to take state at a given time, perform some operation, and then ensure that the state changed thusly. While this can be accomplished using careful bookkeeping and setup, it’s almost pathetically easy to do when you can compare the old state with the new state at the same time. Clojure is also significantly terser than Java, so you can experiment with new tests with less typing.

Learning Clojure isn’t exceptionally difficult, especially if you have had some exposure to functional programming (Ruby counts). I read a book on it a month ago and have an acceptable grasp on it. The amount that you need to know to write tests in it is pretty small. You can mostly just use Java in it like so:

(Klass. arg1) ; new Klass(arg1)

(Klass/staticMethod) ; Klass.staticMethod()

(.method obj arg1 arg2) ; obj.method(arg1, arg2)

In any case, I personally like using Clojure to test topologies, no matter what language they were originally written in.

Dances with RollingTopWords

RollingTopWords is a pretty cool example that takes in a stream of words and returns the top three words in the last ten minutes, continuously. You have a counter bolt (“count” in the topology) that uses a circular buffer of buckets of word counts. In the default configuration, there are 60 buckets for 10 minutes of data, so the current bucket gets swapped out every 10 seconds. When a word comes in, that word’s count in the current bucket is incremented, and the bolt emits the total count of that word in all buckets. A worker thread runs in the background to handle the clearing and swapping of buckets. The word and its count are then consumed by the “rank” bolt, which updates its internal top 3 words and then, if it hasn’t sent out an update in the last 2 seconds, emits its current top 3 words. This is consumed by one “merge” bolt that takes the partial rankings from each “rank” task and finds the global top 3 words. If it hasn’t sent out an update in the last 2 seconds, it emits the rankings.

This topology’s behavior depends extensively on time, which makes it harder to test than topologies that are simply a pure function of their input. In writing the test for RollingTopWords . I first had to make a few changes to the source code to allow time simulation. Storm comes with the utilities backtype.storm.utils.Time and backtype.storm.utils.Utils that allow for time simulation. Any place where you would normally use System.getCurrentTimeMillis(), use Time.getCurrentTimeMillis(), and where you would use Thread.sleep(ms), use Utils.sleep(ms). When you are not simulating time, these methods fall back on the normal ones. The other thing that the timing element does is make complete-topology kind of useless for getting any sort of interesting results. I use a capturing-topology from my own storm-test library. It is basically an incremental, incomplete complete-topology.

Testing is now a matter of ensuring two things:

  1. Word counts are tabulated for a time period and then rotated.
  2. Ranks are actually calculated and emitted correctly.

The first is especially time sensitive since a bucket is current for all of 10 (simulated) seconds. The capturing-topology helpers wait-for-capture and feed-spout-and-wait! both depend on simulate-wait which takes at minimum 10 simulated seconds (and up to TIMEOUT seconds, in increments of 10). advance-cluster-time from backtype.storm.testing also requires care as by default it only advances the simulated time one second at a time (which is slow in real time). If you jack up the increment amount past (by default) 30, which seems reasonable if you’re trying to go forward 10 minutes into the future, your cluster will start restarting itself because of a lack of heartbeat. In this example, any value greater than 10 will confuse the worker thread handling the cleanup, creating weird results. Time is stopped while simulating, so, while still complicated, you can still be fairly precise in your control.

To test the first, the boilerplate looks like:

(deftest test-rolling-count-objects
    (with-simulated-time-local-cluster [cluster]
      (with-capturing-topology [ capture
                                 :mock-sources ["word"]
                                 :storm-conf {TOPOLOGY-DEBUG true} ]

At this point, the time is now 10s.

It’s time to test the single bucket functionality by feeding in a bunch of words and making sure the count is as we expect.

        (feed-spout! capture "word" ["the"])
        (feed-spout! capture "word" ["the"])
        (feed-spout! capture "word" ["the"])
        (feed-spout-and-wait! capture "word" ["the"])
        (is (= ["the" 4]
               (last (read-current-tuples capture "count"))))

The time is now 20s because of the wait after the four tuples are fed in.

We should advance time so we can test the multiple in play bucket case

        (advance-cluster-time cluster 50 9)

Time is now 70s, advanced in increments of 9 to let the worker thread do its business and avoid nasty timeouts.

        (feed-spout! capture "word" ["the"])
        (feed-spout-and-wait! capture "word" ["the"])
        (is (= ["the" 6]
               (last (read-current-tuples capture "count"))))

Time is now 80s. Let’s advance the cluster so the first bucket is now a long lost memory, but the second bucket we wrote to is still in play. To check that, we pump another word in and check the counts coming out.

        (advance-cluster-time cluster 540 9)
        (feed-spout-and-wait! capture "word" ["the"])
        (is (= ["the" 3]
               (last (read-current-tuples capture "count"))))

And that’s that. Over 10 minutes of fake time simulated in under 10 seconds of real time. The only thing left in this test is to close it out in true Lisp fashion:


The test for the rankings that come out of the system is similar, but much simpler because as long as there is at least 2 seconds between each ranking producing tuple and less than 10 minutes of total simulated test time, things pretty much just work. The feed-spout-and-wait! calls give at least 10 seconds of spacing which works out perfectly. The details of that test can be seen in test/storm/starter/test/jvm/RollingTopWords.clj


I released storm-test version 0.1.0 today. It’s installable using the standard lein/clojars magic as [storm-test "0.1.0"]. In addition to the capturing-topology that this blog post demonstrated, it also has the quiet logs functionality and a visualizer for topologies that could be helpful on certain hairier setups.

I should probably plug my company, NabeWise, as it is the reason I get to get my hands dirty with all of this data processing. We’re doing really exciting things with Clojure, Node.js, Ruby, and geographic data.