Did some more debugging. Now I have removed all the in-between processing Bolts that's happening and directly writing the batch of tuples into DB. But the same thing happens. The State and StateUpdater object don't get invoked after committing few batches.
Has anyone seen similar issue with Trident along with State and StateUpdater with partitionPersist API?
public class InputItemTridentSpout implements ITridentSpout<InfoBatch> {
public InputItemTridentSpout() {
}
@Override
public BatchCoordinator<AdInfoBatch> getCoordinator(String s, Map map, TopologyContext topologyContext) {
return new InputItemBatchCoordinator("InfoBatch-Coordinator");
}
@Override
public Emitter<InfoBatch> getEmitter(String s, Map map, TopologyContext topologyContext) {
return new InputItemBatchEmitter();
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields("Info");
}
/**
* Implementation of a BatchCoordinator. Make sure there are only 10 running batches at any given point in time.
*/
private class InputItemBatchCoordinator implements BatchCoordinator<InfoBatch> {
public InputItemBatchCoordinator(String name) {
}
@Override
public InfoBatch initializeTransaction(long txid, InfoBatch prevMetadata) {
return new InfoBatch();
}
@Override
public void success(long txid) {
}
@Override
public boolean isReady(long txid) {
}
@Override
public void close() {
}
}
private class InputItemBatchEmitter implements Emitter<InfoBatch> {
@Override
public void emitBatch(TransactionAttempt transactionAttempt, InfoBatch infoBatch, TridentCollector tridentCollector) {
for (int i = 0; i < 10; ++i) {
tridentCollector.emit(new Values(new Info()));
}
}
@Override
public void success(TransactionAttempt transactionAttempt) {
}
@Override
public void close() {
}
}
}
-----------MyState.java---------------------
public class MyState implements State {
private List<TridentTuple> tuples;
private DB db;
public MyState (Map conf) {
db= new DB();
}
@Override
public void beginCommit(Long txId) {
}
@Override
public void commit(Long txId) {
String txKey = "Trident-Transaction-Id" + txId
if (db.exists(txKey)) {
return; // do nothing
}
db.beginTx();
for (TridentTuple tuple: tuples) {
db.write((Info)tuple.getValue(0));
}
}
db.write(txKey);
db.endTx();
}
public void persistTuples(List<TridentTuple> tuples) {
this.tuples = tuples;
}
private void beginTransaction() {
db.beginTx();
}
public void commitTransaction() {
db.commitTx();
}
}
----------MyStateFactory.java-----------
public class MyStateFactory implements StateFactory {
@Override
public State makeState(Map map, IMetricsContext iMetricsContext, int i, int i2) {
return new MyState(map);
}
}
-----------MyStateUpdater.java----------
public class MyStateUpdater extends BaseStateUpdater<MyState> {
@Override
public void updateState(MyState myState, List<TridentTuple> tuples, TridentCollector collector) {
myState.persistTuples(tuples);
}
}
-----------------------------------------------------------------