Here is a simple approach that you can adopt-
Spout Implementation
-----------------------------------
In the declareOutputFields method of Spout implementation, declare two streams that the spout can emit -
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("StreamA", new Fields("fieldA"));
declarer.declareStream("StreamB", new Fields("fieldB"));
}
Now in the nextTuple method of the Spout you can emit tuples on both of these streams-
@Override
public void nextTuple() {
// your logic
// emit
collector.emit("StreamA", new Values("Tuple for Bolt A"));
collector.emit("StreamB", new Values("Tuple for Bolt B"));
}
In your topology class just link the bolts implementation (for example, BoltA and BoltB) with the Spout-
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException{
// create a topology builder
TopologyBuilder builder = new TopologyBuilder();
// set the spout
builder.setSpout("S1", new SpoutS1(), 1);
// add the bolts
builder.setBolt("A", new BoltA(), 1).shuffleGrouping("S1", "StreamA");
// add a new bolt that connects to same Spout ID but different stream ID
builder.setBolt("B", new BoltB(), 1).shuffleGrouping("S1", "StreamB");
// rest of the configuration
...
}
I haven't run this code but I hope that you will be able to understand the overall idea here. There are a number of implementations in the storm-contrib and storm-starter projects that you can go through-
- Anuj