Hello, all.
I am working on deploying tracking processors in my project. I am using MongoDB as a persistence backend, and to provide a Token store, I have added the respective pull request
https://github.com/AxonFramework/AxonFramework/pull/295 to my project to be able to use tracking at all in this case.
Then what I did is to configure the token store and to activate tracking processors in my axon configuration class (using spring-boot).
@Bean
public MongoTokenStore tokenStore(
org.axonframework.config.Configuration config
) {
org.axonframework.mongo.eventsourcing.tokenstore.DefaultMongoTemplate mongoTemplate = new org.axonframework.mongo.eventsourcing.tokenstore.DefaultMongoTemplate(
mongoClient, database, tokenCollection);
MongoTokenStore tokenStore = new MongoTokenStore(mongoTemplate,
config.serializer());
return tokenStore;
}
@Autowired
public void configure(
EventHandlingConfiguration config
) {
config.usingTrackingProcessors();
}
Over all, tracking seems to work. When I shut down the application, delete a projection and the matching token from the token store (both in Mongo), it is rebuilt on startup. Fine.
Now to the questions:
1. I read, that I would be able to reset a tracker. How would I do this? TrackingEventProcessor only offers start() pause() shutDown, The TokenStore can sonly store tokens, not delete them.
More concretely if I would want to create an in-ram projection of some aggregates:
@Slf4j
@Component
@ProcessingGroup(AccountProjectorInRam.PROCESSING_GROUP)
@RequiredArgsConstructor
public class AccountProjectorInRam {
public static final String PROCESSING_GROUP = "inRamTest";
@PostConstruct
public void init() {
// I want to reset the tracking here!
}
@EventHandler
public void on(
EventOne event
) throws UnknownCredentialsType {
}
@EventHandler
public void on(
EventTwo event
) {
}
}
Is the @PostConstruct the right place? What do I need to inject and how to reset? I suspect I am just looking in the wrong places for this.
2. Extrapolating a bit, how does the replay behave in the case of horizontally scaling identical projections into multiple micro-services. By default the tokens are stored under the same name. So when I deploy Instace A of the projection, the replay runs, and a token is stored. Starting instance B would not get any replay when using the default naming. So basically on startup of a new instance I would have to generate a unique processing group Id?
3. About the Mongo TokenStore:
Upon shutting down the application I get the following exception for every processing group. I suspect I am doing something stupid with my Mongo configuration?
java.lang.IllegalStateException: state should be: open
at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:82) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:75) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.<init>(ClusterBinding.java:71) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:68) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:411) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.operation.FindAndUpdateOperation.execute(FindAndUpdateOperation.java:331) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.Mongo.execute(Mongo.java:845) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.Mongo$2.execute(Mongo.java:828) ~[mongo-java-driver-3.4.2.jar:na]
at com.mongodb.MongoCollectionImpl.findOneAndUpdate(MongoCollectionImpl.java:435) ~[mongo-java-driver-3.4.2.jar:na]
at org.axonframework.mongo.eventsourcing.tokenstore.MongoTokenStore.loadOrInsertTokenEntry(MongoTokenStore.java:152) ~[classes/:na]
at org.axonframework.mongo.eventsourcing.tokenstore.MongoTokenStore.fetchToken(MongoTokenStore.java:83) ~[classes/:na]
at org.axonframework.eventhandling.tokenstore.TokenStore.extendClaim(TokenStore.java:71) ~[axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$processBatch$5(TrackingEventProcessor.java:272) [axon-core-3.0.5.jar:3.0.5]
at org.axonframework.common.transaction.TransactionManager.executeInTransaction(TransactionManager.java:44) ~[axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.processBatch(TrackingEventProcessor.java:272) [axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.processingLoop(TrackingEventProcessor.java:217) [axon-core-3.0.5.jar:3.0.5]
at org.axonframework.eventhandling.TrackingEventProcessor.lambda$start$3(TrackingEventProcessor.java:187) [axon-core-3.0.5.jar:3.0.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
Thank you very much.
Best regards,
Dominic