Testing Storm Topologies (in Clojure)

» 17 Dec 2011

"Storm":https://github.com/nathanmarz/storm is a very exciting framework for real-time data processing. It comes with all sorts of features that are useful for incremental map reduce, distributed RPC, streaming joins, and all manner of other neat tricks. If you are not already familiar with Storm, it is well documented on the "Storm wiki":https://github.com/nathanmarz/storm/wiki . At "NabeWise":http://nabewise.com, we are in the process of creating and rolling out a new system that builds on top of Storm. Storm's Clojure DSL is really very good and allows us to write normal Clojure code that we can then tie up into topologies. This system will enable a large chunk of our feature set and will touch much of our data. Testing that the functionality works as expected is extremely important to us. By using Clojure, we can test much of our system without thinking about storm at all. This was critical while we were writing core code before even having decided on using storm. The functions that end up running our bolts are tested in the usual ways without dependency or knowledge of their place in a topology. We still want to be able to test the behavior of our entire topology or some part of it to ensure that things still work as expected across the entire system. This testing will eventually include test.generative style specs and tests designed to simulate failures. Luckily, Storm ships with a ton of testing features that are available through Clojure (and currently only through Clojure, though this is liable to change). You can find these goodies in "src/clj/backtype/storm/testing.clj":https://github.com/nathanmarz/storm/blob/master/src/clj/backtype/storm/testing.clj. These tools are pretty well exercised in "test/clj/backtype/storm/integration_test.clj":https://github.com/nathanmarz/storm/blob/master/test/clj/backtype/storm/integration_test.clj . We will look into the most important ones here. h4. with-local-cluster This macro starts up a local cluster and keeps it around for the duration of execution of the expressions it contains. You use it like:
  (with-local-cluster [cluster]
    (submit-local-topology (:nimbus cluster)
                           "example"
                           your-topology
                           {TOPOLOGY-DEBUG true})
    (Thread/sleep 1000))
  
This should be used when you mostly just need a cluster and are not using most of the other testing functionality. We use this for a few of our basic DRPC tests. h4. with-simulated-time-local-cluster This macro is exactly like before, but sets up time simulation as well. The simulated time is used in functions like complete-topology when time could have some impact on the results coming out of the topology. h4. complete-topology This is where things start getting interesting. @complete-topology@ will take in your topology, cluster, and configuration, mock out the spouts you specify, run the topology until it is idle and all tuples from the spouts have been either acked or failed, and return all the tuples that have been emitted from all the topology components. It does this by requiring all spouts to be FixedTupleSpout-s (either in the actual topology or as a result of mocking). Mocking spouts is very simple, just specify a map of spout_id to vector of tuples to emit (e.g. @{"spout" [["first tuple"] ["second tuple"] ["etc"]]}@). Simulated time also comes into play here, as every 100 ms of wall clock time will look to the cluster like 10 seconds. This has the effect of causing timeout failures to materialize much faster. You can write tests with this like:
  (with-simulated-time-local-cluster [cluster]
    (let [ results (complete-topology cluster
                                      your-topology
                                      :mock-sources 
                                      {"spout": [["first"]
                                                 ["second"]
                                                 ["third"]]}) ]
      (is (ms= [["first transformed"] ["second transformed"]]
               (read-tuples results "final-bolt")))))
  
All the tuples that are emitted from any bolt or spout can be found by calling @read-tuple@ on the results set with the id of the bolt or spout of interest. Storm also comes with the testing helper @ms=@ which behaves like normal @=@ except that it converts all arguments into multi-sets first. This prevents tests from depending on ordering (which is not guaranteed or expected). As cool as @complete-topology@ is, it is not perfect for every scenario. FixedTupleSpout-s do not declare output fields, so you can't use them when you use a field grouping to a bolt straight off of a spout. (*Correction*: Nathan Marz pointed out that FixedTupleSpouts will use the same output fields as the spout they replace.) You also give up some control over timing (simulated or otherwise) with the dispatch of your tuples, so some scenarios like the RollingTopWords example in "storm-starter":https://github.com/nathanmarz/storm-starter which only emit tuples after a certain amount of time between successive tuples will not be predictably testable using complete-topology alone. This is where simple testing seems to end. I'm including the next macro for completeness and because I think it could be potentially useful for general testing with some wrapping. h4. with-tracked-cluster This is where things start to get squirrelly. This creates a cluster that can support a tracked-topology (which must be created with @mk-tracked-topology@). In your topology, you most likely want to mock out spouts with FeederSpout-s constructed with @feeder-spout@. The power of the tracked topology is that you can feed tuples directly in through the feeder spout and wait until the cluster is idle after having those tuples emitted by the spouts. Currently, this seems to be mainly used to check behavior of acking in the core of storm. It seems like with AckTracker, it would be possible to create a @tracked-wait-for-ack@ type function that could be used to feed in tuples and wait until they are fully processed. This would open up testing with simulated time for things like RollingTopWords. h3. Testing Style The first thing I like to do with my tests is to keep them as quiet as possible. Storm, even with TOPOLOGY_DEBUG turned off, is very chatty. When there are failures in your tests, you often have to sift through a ton of storm noise (thunder?) to find them. Clojure Contrib's logger and Log4J in general are surprisingly hard to shut up, but tucking the following code into a utility namespace does a pretty good job of keeping things peaceful and quiet.
  (ns whatever.util
    (:use [clojure.contrib.logging])
    (:import [org.apache.log4j Logger]))
  
  
  (defn set-log-level [level]
    (.. (Logger/getLogger 
          "org.apache.zookeeper.server.NIOServerCnxn")
      (setLevel level))
    (.. (impl-get-log "") getLogger getParent
      (setLevel level)))
  
  (defmacro with-quiet-logs [& body]
    `(let [ old-level# (.. (impl-get-log "") getLogger 
                           getParent getLevel) ]
       (set-log-level org.apache.log4j.Level/OFF)
       (let [ ret# (do ~@body) ]
         (set-log-level old-level#)
         ret#)))
  
For testing the results of a topology, I like to create a function that takes the input tuples and computes the expected result in the simplest way possible. It then compares that result to what comes out the end of the topology. For sanity, I usually ensure that this predicate holds for the empty case. As an example, here is how I would test the word-count topology in storm-starter:
  (defn- word-count-p
    [input output]
    (is (=
          (frequencies
            (reduce
              (fn [acc sentence]
                (concat acc (.split (first sentence) " ")))
              []
              input))
          ; works because last tuple emitted wins
          (reduce
            (fn [m [word n]]
              (assoc m word n))
            {}
            output))))
  
  (deftest test-word-count
    (with-quiet-logs
      (with-simulated-time-local-cluster [cluster :supervisors 4]
        (let [ topology (mk-topology)
               results (complete-topology 
                         cluster
                         topology
                         :mock-sources 
                         {"1" [["little brown dog"]
                               ["petted the dog"]
                               ["petted a badger"]]
                          "2" [["cat jumped over the door"]
                               ["hello world"]]}
                         :storm-conf {TOPOLOGY-DEBUG true
                                      TOPOLOGY-WORKERS 2}) ]
          ; test initial case
          (word-count-p [] [])
          ; test after run
          (word-count-p
            (concat (read-tuples results "1") 
              (read-tuples results "2"))
            (read-tuples results "4"))))))
  
h3. Conclusion This is my current thinking about testing storm topologies. I'm working on some tests that incorporate more control over ordering/timing, as well as, hooking a topology test into test.generative or something of that sort, so that I can test how a large number of unpredictable inputs will affect the system as a whole. "Part 2":http://www.pixelmachine.org/2011/12/21/Testing-Storm-Topologies-Part-2.html is now available.