import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
public class TestWordSpout implements IRichSpout {
public static Logger LOG = Logger.getLogger(TestWordSpout.class);
boolean _isDistributed;
SpoutOutputCollector _collector;
private transient LinkedBlockingQueue<MessageWrapper> messageQueue;
private transient ConcurrentHashMap<Long, MessageWrapper> id2wrapperMap;
public static final class MessageWrapper {
final byte[] message;
final CountDownLatch latch;
volatile boolean success = false;
final long messageId;
public MessageWrapper(final byte[] message, final long messageId) {
this.message = message;
this.messageId = messageId;
this.latch = new CountDownLatch(1);
}
}
public TestWordSpout() {
this(true);
}
public TestWordSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public boolean isDistributed() {
return _isDistributed;
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
id2wrapperMap = new ConcurrentHashMap<Long, MessageWrapper>();
messageQueue = new LinkedBlockingQueue<MessageWrapper>();
new Thread() {
private byte[] word = new byte[1024];
public void run() {
while (true) {
final long messageId = MessageId.generateId();
final MessageWrapper wrapper = new MessageWrapper(word, messageId);
id2wrapperMap.put(messageId, wrapper);
messageQueue.offer(wrapper);
try {
wrapper.latch.await();
} catch (final InterruptedException ie) {
LOG.error("InterruptedException while consuming message ", ie);
Thread.currentThread().interrupt();
}
if (!wrapper.success) {
LOG.warn("Error while consuming message " + messageId);
}
}
}
}.start();
}
public void close() {
}
public void nextTuple() {
final MessageWrapper wrapper = messageQueue.poll();
if (wrapper == null)
return;
final byte[] message = wrapper.message;
_collector.emit(new Values(message), wrapper.messageId);
}
public void ack(Object msgId) {
if (msgId instanceof Long) {
final long id = (Long) msgId;
final MessageWrapper wrapper = id2wrapperMap.remove(id);
if (wrapper == null) {
LOG.warn(String.format("don't know how to ack(%s: %s)", msgId
.getClass().getName(), msgId));
return;
}
wrapper.success = true;
wrapper.latch.countDown();
} else {
LOG.warn(String.format("don't know how to ack(%s: %s)", msgId
.getClass().getName(), msgId));
}
}
public void fail(Object msgId) {
if (msgId instanceof Long) {
final long id = (Long) msgId;
final MessageWrapper wrapper = id2wrapperMap.remove(id);
if (wrapper == null) {
LOG.warn(String.format("don't know how to reject(%s: %s)", msgId
.getClass().getName(), msgId));
return;
}
wrapper.success = false;
wrapper.latch.countDown();
} else {
LOG.warn(String.format("don't know how to reject(%s: %s)", msgId
.getClass().getName(), msgId));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
The bolt used for testing is very simple, just for acking messages.
My research interests are distributed systems, parallel computing and bytecode based virtual machine.