Did some more debugging. Now I have removed all the in-between processing Bolts that's happening and directly writing the batch of tuples into DB. But the same thing happens. The State and StateUpdater object don't get invoked after committing few batches.
Has anyone seen similar issue with Trident along with State and StateUpdater with partitionPersist API?
public class InputItemTridentSpout implements ITridentSpout<InfoBatch> {
public InputItemTridentSpout() {
public BatchCoordinator<AdInfoBatch> getCoordinator(String s, Map map, TopologyContext topologyContext) {
return new InputItemBatchCoordinator("InfoBatch-Coordinator");
public Emitter<InfoBatch> getEmitter(String s, Map map, TopologyContext topologyContext) {
return new InputItemBatchEmitter();
public Map<String, Object> getComponentConfiguration() {
return null;
public Fields getOutputFields() {
return new Fields("Info");
* Implementation of a BatchCoordinator. Make sure there are only 10 running batches at any given point in time.
private class InputItemBatchCoordinator implements BatchCoordinator<InfoBatch> {
public InputItemBatchCoordinator(String name) {
public InfoBatch initializeTransaction(long txid, InfoBatch prevMetadata) {
return new InfoBatch();
public void success(long txid) {
public boolean isReady(long txid) {
public void close() {
private class InputItemBatchEmitter implements Emitter<InfoBatch> {
public void emitBatch(TransactionAttempt transactionAttempt, InfoBatch infoBatch, TridentCollector tridentCollector) {
for (int i = 0; i < 10; ++i) {
tridentCollector.emit(new Values(new Info()));
public void success(TransactionAttempt transactionAttempt) {
public void close() {
public class MyState implements State {
private List<TridentTuple> tuples;
private DB db;
public MyState (Map conf) {
db= new DB();
public void beginCommit(Long txId) {
public void commit(Long txId) {
String txKey = "Trident-Transaction-Id" + txId
if (db.exists(txKey)) {
return; // do nothing
for (TridentTuple tuple: tuples) {
public void persistTuples(List<TridentTuple> tuples) {
this.tuples = tuples;
private void beginTransaction() {
public void commitTransaction() {
public class MyStateFactory implements StateFactory {
public State makeState(Map map, IMetricsContext iMetricsContext, int i, int i2) {
return new MyState(map);
public class MyStateUpdater extends BaseStateUpdater<MyState> {
public void updateState(MyState myState, List<TridentTuple> tuples, TridentCollector collector) {