Some questions on tracking

530 views
Skip to first unread message

Dominic Heutelbeck

unread,
Jul 31, 2017, 10:14:13 AM7/31/17
to Axon Framework Users
Hello, all.

I am working on deploying tracking processors in my project. I am using MongoDB as a persistence backend, and to provide a Token store, I have added the respective pull request https://github.com/AxonFramework/AxonFramework/pull/295 to my project to be able to use tracking at all in this case.

Then what I did is to configure the token store and to activate tracking processors in my axon configuration class (using spring-boot).


    @Bean
    public MongoTokenStore tokenStore(
            org.axonframework.config.Configuration config
            ) {
        org.axonframework.mongo.eventsourcing.tokenstore.DefaultMongoTemplate mongoTemplate = new org.axonframework.mongo.eventsourcing.tokenstore.DefaultMongoTemplate(
                mongoClient, database, tokenCollection);
        MongoTokenStore tokenStore = new MongoTokenStore(mongoTemplate,
                config.serializer());
        return tokenStore;
    }

    @Autowired
    public void configure(
            EventHandlingConfiguration config
    ) {
        config.usingTrackingProcessors();
    }


Over all, tracking seems to work. When I shut down the application, delete a projection and the matching token from the token store (both in Mongo), it is rebuilt on startup. Fine.

Now to the questions:

1. I read, that I would be able to reset a tracker. How would I do this? TrackingEventProcessor only offers start() pause() shutDown, The TokenStore can sonly store tokens, not delete them. 

More concretely if I would want to create an in-ram projection of some aggregates:

@Slf4j
@Component
@ProcessingGroup(AccountProjectorInRam.PROCESSING_GROUP)
@RequiredArgsConstructor
public class AccountProjectorInRam {

    public static final String PROCESSING_GROUP = "inRamTest";

    @PostConstruct
    public void init() {

        // I want to reset the tracking here!
        
    }

    @EventHandler
    public void on(
            EventOne event
    ) throws UnknownCredentialsType {
        log.info("event: {}", event);

    }

    @EventHandler
    public void on(
            EventTwo event
    ) {
        log.info("event: {}", event);
    }

}

Is the @PostConstruct the right place? What do I need to inject and how to reset? I suspect I am just looking in the wrong places for this.

2. Extrapolating a bit, how does the replay behave in the case of horizontally scaling identical projections into multiple micro-services. By default the tokens are stored under the same name. So when I deploy Instace A of the projection, the replay runs, and a token is stored. Starting instance B would not get any replay when using the default naming. So basically on startup of a new instance I would have to generate a unique processing group Id?


3. About the Mongo TokenStore:

Upon shutting down the application I get the following exception for every processing group. I suspect I am doing something stupid with my Mongo configuration?


java.lang.IllegalStateException: state should be: open
at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:82) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:411) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.operation.FindAndUpdateOperation.execute(FindAndUpdateOperation.java:331) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.Mongo.execute(Mongo.java:845) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.Mongo$2.execute(Mongo.java:828) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.MongoCollectionImpl.findOneAndUpdate(MongoCollectionImpl.java:435) ~[mongo-java-driver-3.4.2.jar:na]
at org.axonframework.mongo.eventsourcing.tokenstore.MongoTokenStore.loadOrInsertTokenEntry(MongoTokenStore.java:152) ~[classes/:na]
at org.axonframework.mongo.eventsourcing.tokenstore.MongoTokenStore.fetchToken(MongoTokenStore.java:83) ~[classes/:na]
at org.axonframework.eventhandling.tokenstore.TokenStore.extendClaim(TokenStore.java:71) ~[axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$processBatch$5(TrackingEventProcessor.java:272) [axon-core-3.0.5.jar:3.0.5]
at org.axonframework.common.transaction.TransactionManager.executeInTransaction(TransactionManager.java:44) ~[axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:272) [axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:217) [axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$start$3(TrackingEventProcessor.java:187) [axon-core-3.0.5.jar:3.0.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]


Thank you very much.

Best regards,
Dominic

Allard Buijze

unread,
Aug 3, 2017, 7:55:21 AM8/3/17
to Axon Framework Users
Hello Dominic,

let me try to answer your questions, one at a time:

1. Resetting a token is currently not supported by an API. You can reset a TrackingEventProcessor by stopping it (API call), then removing the token from the token store (no API call available, yet) and then starting the processor again.
If your intent is to store the data in-memory, you will probably also want to store the token in memory. This way, a TrackingEventProcessor will automatically restart when the application starts.

2. If you're scaling services, and you're configuring them to store data in the same database, they will also compete for tracking tokens. Currently, only a single token per TrackingEventProcessor is supported, so only 1 node can be actively consuming events at any time. This will change in 3.1, where multiple tokens are supported.
If your services each have a separate datastore, then they would probably also have a token store that is stored in that same database. In such a setup, it doesn't matter howmany services you have. Even if some services share a datastore, they will automatically start competing for these tokens.

3. This is an issue that will be resolved in Axon 3.0.6. Spring will trigger a stop for each of the processors, but it won't wait for the processors to shut down. It may happen that the Mongo clients have been disconnected while the EventProcessor is still processing its last event. In such case, these exceptions occur.

Hope this helps.
Cheers,

Allard

Op ma 31 jul. 2017 om 16:14 schreef Dominic Heutelbeck <dominic.h...@gmail.com>:
--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dominic Heutelbeck

unread,
Aug 3, 2017, 1:08:33 PM8/3/17
to Axon Framework Users
Hello, Allard.


Am Donnerstag, 3. August 2017 13:55:21 UTC+2 schrieb Allard Buijze:
Hello Dominic,

let me try to answer your questions, one at a time:

1. Resetting a token is currently not supported by an API. You can reset a TrackingEventProcessor by stopping it (API call), then removing the token from the token store (no API call available, yet) and then starting the processor again.
If your intent is to store the data in-memory, you will probably also want to store the token in memory. This way, a TrackingEventProcessor will automatically restart when the application starts.


Ok this explains my futile search. Thanks for clarifying this. So far I have not been able to figure out on how to assign different EventProcessors to individual projections.


2. If you're scaling services, and you're configuring them to store data in the same database, they will also compete for tracking tokens. Currently, only a single token per TrackingEventProcessor is supported, so only 1 node can be actively consuming events at any time. This will change in 3.1, where multiple tokens are supported.
If your services each have a separate datastore, then they would probably also have a token store that is stored in that same database. In such a setup, it doesn't matter howmany services you have. Even if some services share a datastore, they will automatically start competing for these tokens.


Ok, then it is exactly as I figured at the moment. So currently I would tend to replicate the DB instances as well. So when scaling out, each instance would get its own DB/Tokenstore to work with as well. 
 
3. This is an issue that will be resolved in Axon 3.0.6. Spring will trigger a stop for each of the processors, but it won't wait for the processors to shut down. It may happen that the Mongo clients have been disconnected while the EventProcessor is still processing its last event. In such case, these exceptions occur.

Also thanks for clarifying.  


Hope this helps.

Yes, thanks a lot.

Best regards,
Dominic 
Cheers,

Allard Buijze

unread,
Aug 4, 2017, 3:14:04 AM8/4/17
to Axon Framework Users
Hi Dominic,

regarding assigning EventProcessors to projections, the default behavior of Axon is to divide event handlers into Processing Groups. Each ProcessingGroup has a name. By default, each EventHandler class is assigned to the ProcessingGroup with the name of the package of the Event Handler. This can be overridden using the @ProcessingGroup annotation. Then, Axon will create a Processor (Subscribing by default) for each ProcessingGroup. The name of the Processor is the name of the ProcessingGroup. If the Processor is a Tracking one, the TrackingToken will be registered with the TokenStore under that name.

Hope this clarifies.

Allard

Op do 3 aug. 2017 om 19:08 schreef Dominic Heutelbeck <dominic.h...@gmail.com>:

Dominic Heutelbeck

unread,
Aug 4, 2017, 7:23:56 AM8/4/17
to Axon Framework Users


Am Freitag, 4. August 2017 09:14:04 UTC+2 schrieb Allard Buijze:
regarding assigning EventProcessors to projections, the default behavior of Axon is to divide event handlers into Processing Groups. Each ProcessingGroup has a name. By default, each EventHandler class is assigned to the ProcessingGroup with the name of the package of the Event Handler. This can be overridden using the @ProcessingGroup annotation. Then, Axon will create a Processor (Subscribing by default) for each ProcessingGroup. The name of the Processor is the name of the ProcessingGroup. If the Processor is a Tracking one, the TrackingToken will be registered with the TokenStore under that name.

This is clear to me. With regards to the assigning, I was wondering, on how to assign different TrackingProcessor implementations to different processing groups. So for example I want the MongoTrackingProcessor as a default. Easy to do by  defining the matching bean and doing config.usingTrackingProcessors(); But can I assign a processor using the InMemoryTokenStore  to a specific processing group, while keeping the one using mongo as a default ? 

Thanks,
Dominic

Allard Buijze

unread,
Aug 4, 2017, 8:19:06 AM8/4/17
to Axon Framework Users
If you want to create a processor with specific configuration for just one group, you can use the "org.axonframework.config.EventHandlingConfiguration#registerEventProcessor" method. The second parameter (an EventProcessorBuilder) is a function that must create the processor, based on the configuration, a given name (which is always equal to the first parameter, for this method), and list of event handlers.
In your case, the function would create a TrackingEventProcessor that is configured with the InMemoryTokenStore. In the (overall) configuration, you would configure the MongoTokenStore, so that other TrackingProcessors will use that one.

Cheers,

Allard

Op vr 4 aug. 2017 om 13:23 schreef Dominic Heutelbeck <dominic.h...@gmail.com>:
--

Dominic Heutelbeck

unread,
Aug 5, 2017, 8:55:49 AM8/5/17
to Axon Framework Users
Thank you, Allard. Will look into this.
Best regards,
Dominic
Reply all
Reply to author
Forward
0 new messages