Testing the logic of Storm topologies

4,191 views
Skip to first unread message

stone

unread,
Dec 13, 2011, 1:51:35 AM12/13/11
to storm...@googlegroups.com
Hi ALL,

I am implementing storm topologies, the testing approach I used is like the following:

  • set up a queue the spout depends on
  • for each unit test
      push the testing data in the queue
      start the topology to be tested
      wait for some time long enough that I am sure the topology has finished processing the items in the queue then shutdown the topology
      compare the results in DB with expectations
    end

The problem is if I have many unit tests, start the topology, wait for it to finish and shutdown many times is slow. Is there some way that I can avoid putting data in the queue and inject data to spout directly? How can I know the topology I am testing has done the work ? (maybe add a counter in topology to remember how many items in the queue got processed?)

Can anyone share your experiences about testing Storm Topologies? 

Thanks in advance.

Stone



Nathan Marz

unread,
Dec 13, 2011, 8:02:30 PM12/13/11
to storm...@googlegroups.com
Storm actually has some awesome built-in stuff for doing testing, although it's only available in Clojure right now (Storm's unit tests use this stuff heavily). Basically I can do this:

(complete-topology local-cluster topology :mock-sources {"spout1" [["a" 1] ["b" 2]]})

This will mock out "spout1" with the tuples provided, and then wait for the topology to ack both of those spout tuples. complete-topology then returns an object that you can query for the tuples emitted by any component in the topology. 

I've opened up an issue to expose the testing facilities here: https://github.com/nathanmarz/storm/issues/72

In the meantime, here's the implementation of complete-topology: https://github.com/nathanmarz/storm/blob/master/src/clj/backtype/storm/testing.clj#L302

--
Twitter: @nathanmarz
http://nathanmarz.com

Dave

unread,
Dec 15, 2011, 7:49:05 PM12/15/11
to storm-user
Very timely discussion as I prepare to start coding my first Storm
topology. I've been asking myself these very same questions. I'd like
to be able to unit test the individual spouts and bolts by themselves
then wire everything together for integration testing. It seems that
if you could mock out a collector somehow to capture the tuple output
that you should be able to drive a bolt standalone using jUnit or
TestNG. Am I missing something? Has anyone tried to mock out a
Collector yet and could share some code?

Thanks. I'm really excited to get going on this finally.

Dave

On Dec 13, 7:02 pm, Nathan Marz <nathan.m...@gmail.com> wrote:
> Storm actually has some awesome built-in stuff for doing testing, although
> it's only available in Clojure right now (Storm's unit tests use this stuff
> heavily). Basically I can do this:
>
> (complete-topology local-cluster topology :mock-sources {"spout1" [["a" 1]
> ["b" 2]]})
>
> This will mock out "spout1" with the tuples provided, and then wait for the
> topology to ack both of those spout tuples. complete-topology then returns
> an object that you can query for the tuples emitted by any component in the
> topology.
>
> I've opened up an issue to expose the testing facilities here:https://github.com/nathanmarz/storm/issues/72
>

> In the meantime, here's the implementation of complete-topology:https://github.com/nathanmarz/storm/blob/master/src/clj/backtype/stor...


>
>
>
>
>
>
>
>
>
> On Mon, Dec 12, 2011 at 10:51 PM, stone <stones....@gmail.com> wrote:
> > Hi ALL,
>
> > I am implementing storm topologies, the testing approach I used is like
> > the following:
>

> >    - set up a queue the spout depends on
> >    - for each unit test

Nathan Marz

unread,
Dec 16, 2011, 3:54:13 AM12/16/11
to storm...@googlegroups.com
It should be really easy for you to make a mock OutputCollector for testing individual spouts/bolts. For bolts, you just need to override backtype.storm.task.OutputCollector and implement the two remaining methods where you can capture the output. Sounds like a useful piece of code to open source ;)

Dave

unread,
Dec 16, 2011, 12:43:53 PM12/16/11
to storm-user
Seemed like it should be easy. The problem I'm having is creating the
Tuple that I want to send into the execute() method of the bolt. It's
proving difficult to figure out how to create the TopologyContext when
I'm creating the Tuple. Any hints for how to mock out a
TopologyContext, so I can create a Tuple? It keeps throwing an NPE:

java.lang.NullPointerException
at
backtype.storm.task.TopologyContext.getComponentCommon(TopologyContext.java:
286)
at
backtype.storm.task.TopologyContext.getComponentOutputFields(TopologyContext.java:
211)
at backtype.storm.tuple.Tuple.<init>(Tuple.java:41)
at backtype.storm.tuple.Tuple.<init>(Tuple.java:51)
at
com.company.data.etl.storm.SoftwareDiscriminatorBoltTests.SoftwareDiscriminatorBoltEmitsToCorrectOutputStream(SoftwareDiscriminatorBoltTests.java:
68)

Thanks,

Dave

Dave

unread,
Dec 16, 2011, 12:49:35 PM12/16/11
to storm-user
Never mind my last question about mocking out a TopologyContext. A
better solution is to just create a MockTuple that doesn't need all
that other stuff. Sorry for the trouble.

Dave

On Dec 16, 2:54 am, Nathan Marz <nathan.m...@gmail.com> wrote:

Danny

unread,
Feb 9, 2012, 12:16:43 PM2/9/12
to storm-user
can you attach your MockTuple class? Would really help me out...
Having same issues

Ted Dunning

unread,
Feb 9, 2012, 2:50:07 PM2/9/12
to storm...@googlegroups.com, storm-user
It is hard to mock the tuple directly because there isn't a clean factory sort of structure. Take a look at the tests in my storm-counts project on github for an example that uses jmockit.

Another issue that comes up is that you need to mock the output collector and fir lots of bolts you need to mock out things like System.nanoTime(). All that is quite doable with jmockit and the results are pretty easy reading as well.

Sent from my iPhone

Dave Kincaid

unread,
Feb 9, 2012, 4:54:02 PM2/9/12
to storm-user
We have actually been using EasyMock to mock out a Tuple when we need
one in a test. Here's what we've got. The message parameter is just a
JSON string that we pass from Bolt to Bolt.

public class StormTestUtils {
public static Tuple mockTuple(String message) {
Tuple tuple = createMock(Tuple.class);
expect(tuple.getString(0)).andReturn(message).atLeastOnce();
expect(tuple.getValues()).andReturn(new
Values(message)).atLeastOnce();

expect(tuple.getSourceComponent()).andReturn("abc").anyTimes();
replay(tuple);
return tuple;
}

then a unit test will look something like this:

MockOutputCollector outputCollector = new
MockOutputCollector();
MyTestBolt boltUnderTest = new MyTestBolt();
fieldMapperBolt.prepare(null, null, outputCollector);
Tuple tuple = StormTestUtils.mockTuple(entity);
boltUnderTest.execute(tuple);
assertEquals((String) outputCollector.tuple.get(0),
expectedEntity);
verify(lookup);

the MockOutputCollector is a class we created which extends
OutputCollector overrides emit, ack, and fail and captures what the
bolt emits and whether it has called ack or fail.

We are able to very successfully test our bolts in isolation using
this method.

Dave Kincaid

unread,
Feb 9, 2012, 4:55:44 PM2/9/12
to storm-user
Oops. You should ignore that "verify(lookup)" on the end. It's
checking a different mock that we had to use in the test that I copied
that from

Ted Dunning

unread,
Feb 9, 2012, 4:56:22 PM2/9/12
to storm...@googlegroups.com
My needs were a bit different since I needed to emulate various different timing effects.  But the result is pretty similar.

Danny

unread,
Feb 9, 2012, 5:04:48 PM2/9/12
to storm-user
Dave,
I actually ended up doing something similar. I couldn't figure out
how EasyMock worked so I just did this:
https://gist.github.com/1782187

But I also had to create another custom class called
MockTopologyContext (extending TopologyContext) which simply overrides
getComponentOutputFields
public Fields getComponentOutputFields(String componentId, String
streamId)
{
return new Fields("");
}

Doubtful if my method is the "best" way but it seems to work
Reply all
Reply to author
Forward
Message has been deleted
0 new messages