Trident StateUpdater not getting invoked after committing few batches.

292 views
Skip to first unread message

Atul Dambalkar

unread,
Nov 20, 2013, 2:13:53 AM11/20/13
to storm...@googlegroups.com
Hi,

I have following Trident topology  that uses StateFactory, State and StateUpdater along with "partitionPersist". It looks like, my State object gets called only few times and then it simply stops working. There is no data/tuples are getting forwarded to it for the next subsequent batches. As I am using ITridentSpout, I do get "success(TransactionAttempt transactionAttempt)" being called in my Trident Emitter as well as "success(long txId)" being called in my Trident BatchCoordinator. Has someone else faced a similar issue? 

The TridentSpout is emitting data, and it reaches till the very last aggregating Bolt, after that the Trident batch its' not getting forwarded to the State and StateUpdater.

Any clues? 

-Atul


Message has been deleted
Message has been deleted

Atul Dambalkar

unread,
Nov 20, 2013, 4:02:43 AM11/20/13
to storm...@googlegroups.com
Did some more debugging. Now I have removed all the in-between processing Bolts that's happening and directly writing the batch of tuples into DB. But the same thing happens. The State and StateUpdater object don't get invoked after committing few batches.

Has anyone seen similar issue with Trident along with State and StateUpdater with partitionPersist API?

Here is my code, if this can be of any help -

-----------InputItemTridentSpout.java--------------------

public class InputItemTridentSpout implements ITridentSpout<InfoBatch> {

    public InputItemTridentSpout() {
    }

    @Override
    public BatchCoordinator<AdInfoBatch> getCoordinator(String s, Map map, TopologyContext topologyContext) {
        return new InputItemBatchCoordinator("InfoBatch-Coordinator");
    }

    @Override
    public Emitter<InfoBatch> getEmitter(String s, Map map, TopologyContext topologyContext) {
        return new InputItemBatchEmitter();
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("Info");
    }

    /**
     * Implementation of a BatchCoordinator. Make sure there are only 10 running batches at any given point in time.
     */
    private class InputItemBatchCoordinator implements BatchCoordinator<InfoBatch> {

        public InputItemBatchCoordinator(String name) {
            this.name = name;
        }

        @Override
        public InfoBatch initializeTransaction(long txid, InfoBatch prevMetadata) {
            return new InfoBatch();
        }

        @Override
        public void success(long txid) {
        }

        @Override
        public boolean isReady(long txid) {
        }

        @Override
        public void close() {
        }
    }

    private class InputItemBatchEmitter implements Emitter<InfoBatch> {

        @Override
        public void emitBatch(TransactionAttempt transactionAttempt, InfoBatch infoBatch, TridentCollector tridentCollector) {
            for (int i = 0; i < 10; ++i) {
                tridentCollector.emit(new Values(new Info()));
            }
             
        }
        @Override
        public void success(TransactionAttempt transactionAttempt) {
        }

        @Override
        public void close() {
        }
    }

}

-----------MyState.java---------------------
public class MyState implements State {

    private List<TridentTuple> tuples;
    private DB db;

    public MyState (Map conf) {
        db= new DB(); 
    }

    @Override
    public void beginCommit(Long txId) {
    }

    @Override
    public void commit(Long txId) {
        String txKey = "Trident-Transaction-Id" + txId

        if (db.exists(txKey)) {
            return; // do nothing
        }
        db.beginTx();

        for (TridentTuple tuple: tuples) {
                db.write((Info)tuple.getValue(0));
            }
        }
        db.write(txKey);

        db.endTx();
    }

    public void persistTuples(List<TridentTuple> tuples) {
        this.tuples = tuples;
    }

    private void beginTransaction() {
       db.beginTx();
    }

    public void commitTransaction() {
        db.commitTx();
    }

}
----------MyStateFactory.java-----------
public class MyStateFactory implements StateFactory {

    @Override
    public State makeState(Map map, IMetricsContext iMetricsContext, int i, int i2) {
        return new MyState(map);
    }
}
-----------MyStateUpdater.java----------
public class MyStateUpdater extends BaseStateUpdater<MyState> {

    @Override
    public void updateState(MyState myState, List<TridentTuple> tuples, TridentCollector collector) {
       myState.persistTuples(tuples);
    }

}
-----------------------------------------------------------------

pa198924

unread,
Dec 12, 2013, 1:35:54 AM12/12/13
to storm...@googlegroups.com
I am also facing the same issue.Can you pls help me how u resolved this issue???

Reply all
Reply to author
Forward
0 new messages