Testing Storm Topologies (in Clojure)

» 17 Dec 2011

Storm is a very exciting framework for
real-time data processing. It comes with all sorts of features that are
useful for incremental map reduce, distributed RPC, streaming joins, and all
manner of other neat tricks. If you are not already familiar with Storm, it
is well documented on the Storm

At NabeWise, we are in the process of creating and
rolling out a new system that builds on top of Storm. Storm’s Clojure DSL is
really very good and allows us to write normal Clojure code that we can then
tie up into topologies. This system will enable a large chunk of our feature
set and will touch much of our data. Testing that the functionality works as
expected is extremely important to us.

By using Clojure, we can test much of our system without thinking about storm
at all. This was critical while we were writing core code before even having
decided on using storm. The functions that end up running our bolts are
tested in the usual ways without dependency or knowledge of their place in a
topology. We still want to be able to test the behavior of our entire
topology or some part of it to ensure that things still work as expected
across the entire system. This testing will eventually include
test.generative style specs and tests designed to simulate failures.

Luckily, Storm ships with a ton of testing features that are available through
Clojure (and currently only through Clojure, though this is liable to change).
You can find these goodies in
These tools are pretty well exercised in
. We will look into the most important ones here.


This macro starts up a local cluster and keeps it around for the duration of
execution of the expressions it contains. You use it like:

(with-local-cluster [cluster]
    (submit-local-topology (:nimbus cluster)
                           {TOPOLOGY-DEBUG true})
    (Thread/sleep 1000))

This should be used when you mostly just need a cluster and are not using most
of the other testing functionality. We use this for a few of our basic DRPC


This macro is exactly like before, but sets up time simulation as well. The
simulated time is used in functions like complete-topology when time could
have some impact on the results coming out of the topology.


This is where things start getting interesting. complete-topology will take
in your topology, cluster, and configuration, mock out the spouts you specify,
run the topology until it is idle and all tuples from the spouts have been
either acked or failed, and return all the tuples that have been emitted from
all the topology components. It does this by requiring all spouts to be
FixedTupleSpout-s (either in the actual topology or as a result of mocking).

Mocking spouts is very simple, just specify a map of spout_id to vector of
tuples to emit (e.g. {"spout" [["first tuple"] ["second tuple"] ["etc"]]}).

Simulated time also comes into play here, as every 100 ms of wall clock time
will look to the cluster like 10 seconds. This has the effect of causing
timeout failures to materialize much faster.

You can write tests with this like:

(with-simulated-time-local-cluster [cluster]
    (let [ results (complete-topology cluster
                                      {"spout": [["first"]
                                                 ["third"]]}) ]
      (is (ms= [["first transformed"] ["second transformed"]]
               (read-tuples results "final-bolt")))))

All the tuples that are emitted from any bolt or spout can be found by calling
read-tuple on the results set with the id of the bolt or spout of interest.
Storm also comes with the testing helper ms= which behaves like normal =
except that it converts all arguments into multi-sets first. This prevents
tests from depending on ordering (which is not guaranteed or expected).

As cool as complete-topology is, it is not perfect for every scenario.
FixedTupleSpout-s do not declare output fields, so you can’t use them
when you use a field grouping to a bolt straight off of a spout.

(Correction: Nathan Marz pointed out that FixedTupleSpouts will use the same
output fields as the spout they replace.) You also give up some control over
timing (simulated or otherwise) with the dispatch of your tuples, so some
scenarios like the RollingTopWords example in storm-starter which only emit
tuples after a certain amount of time between successive tuples will not be
predictably testable using complete-topology alone.

This is where simple testing seems to end. I’m including the next macro for
completeness and because I think it could be potentially useful for general
testing with some wrapping.


This is where things start to get squirrelly. This creates a cluster that can
support a tracked-topology (which must be created with
mk-tracked-topology). In your topology, you most likely want to mock out
spouts with FeederSpout-s constructed with feeder-spout. The power of the
tracked topology is that you can feed tuples directly in through the feeder
spout and wait until the cluster is idle after having those tuples emitted by
the spouts. Currently, this seems to be mainly used to check behavior of
acking in the core of storm.

It seems like with AckTracker, it would be possible to create a
tracked-wait-for-ack type function that could be used to feed in tuples and
wait until they are fully processed. This would open up testing with
simulated time for things like RollingTopWords.

Testing Style

The first thing I like to do with my tests is to keep them as quiet as
possible. Storm, even with TOPOLOGY_DEBUG turned off, is very chatty. When
there are failures in your tests, you often have to sift through a ton of
storm noise (thunder?) to find them. Clojure Contrib’s logger and Log4J in
general are surprisingly hard to shut up, but tucking the following code into
a utility namespace does a pretty good job of keeping things peaceful and

(ns whatever.util
    (:use [clojure.contrib.logging])
    (:import [org.apache.log4j Logger]))
  (defn set-log-level [level]
    (.. (Logger/getLogger 
      (setLevel level))
    (.. (impl-get-log "") getLogger getParent
      (setLevel level)))
  (defmacro with-quiet-logs [& body]
    `(let [ old-level# (.. (impl-get-log "") getLogger 
                           getParent getLevel) ]
       (set-log-level org.apache.log4j.Level/OFF)
       (let [ ret# (do ~@body) ]
         (set-log-level old-level#)

For testing the results of a topology, I like to create a function that takes
the input tuples and computes the expected result in the simplest way
possible. It then compares that result to what comes out the end of the
topology. For sanity, I usually ensure that this predicate holds for the
empty case.

As an example, here is how I would test the word-count topology in

(defn- word-count-p
    [input output]
    (is (=
              (fn [acc sentence]
                (concat acc (.split (first sentence) " ")))
          ; works because last tuple emitted wins
            (fn [m [word n]]
              (assoc m word n))
  (deftest test-word-count
      (with-simulated-time-local-cluster [cluster :supervisors 4]
        (let [ topology (mk-topology)
               results (complete-topology 
                         {"1" [["little brown dog"]
                               ["petted the dog"]
                               ["petted a badger"]]
                          "2" [["cat jumped over the door"]
                               ["hello world"]]}
                         :storm-conf {TOPOLOGY-DEBUG true
                                      TOPOLOGY-WORKERS 2}) ]
          ; test initial case
          (word-count-p [] [])
          ; test after run
            (concat (read-tuples results "1") 
              (read-tuples results "2"))
            (read-tuples results "4"))))))


This is my current thinking about testing storm topologies. I’m working on
some tests that incorporate more control over ordering/timing, as well as,
hooking a topology test into test.generative or something of that sort, so
that I can test how a large number of unpredictable inputs will affect the
system as a whole.

Part 2 is now available.