async Command Bus - Command Executed executed Not ordered

221 views
Skip to first unread message

Meong Meong

unread,
Sep 19, 2016, 7:16:47 AM9/19/16
to Axon Framework Users
trying send command to async command bus with 
@Bean
@ConditionalOnMissingBean
public CommandBus commandBus() {
   
log.info("commandBus : AsynchronousCommandBus");
   
   
Assert.notNull(loggingInterceptor, "loggingInterceptor can't null");
   
Assert.notNull(withTransaction, "withTransaction can't null");
   
Assert.notNull(beanValidationInterceptor, "beanValidationInterceptor can't null");
   
Assert.notNull(correlationCommandHandlerInterceptor, "correlationCommandHandlerInterceptor can't null");
   
Assert.notNull(auditingInterceptor, "auditingInterceptor can't null");
   
Assert.notNull(asyncExecutor, "asyncExecutor can't null");
   
   
AsynchronousCommandBus commandBus = new AsynchronousCommandBus(asyncExecutor);

   commandBus
.setDispatchInterceptors(Lists.newArrayList(beanValidationInterceptor));
   commandBus
.setHandlerInterceptors(Lists.newArrayList(correlationCommandHandlerInterceptor, auditingInterceptor, loggingInterceptor));
   commandBus
.setTransactionManager(withTransaction);
   
return commandBus;
}

with same aggregateId executed not ordered..

2016-09-19 18:08:30.579 DEBUG 3587 --- [Task-Executor-3] c.j.l.d.j.aggregate.RequestNumberSaga    : send AddJournalNumber command


2016-09-19 18:08:30.579 DEBUG 3587 --- [Task-Executor-3] c.j.l.d.j.aggregate.RequestNumberSaga    : AddPostingNumber handled with event JournalNumberIncreased(super=AccountingPeriodEvent(accountingPeriodId=5_2016), correlationId=7fdc974b-b0c8-4320-97f3-64427c81ec7d, sequenceId=2016_JKM, entry=JournalNumberEntry(journalNumber=46, postingNumber=46), entryIncreased=JournalNumberEntryWithKey(sequenceId=2016_JKM, entry=JournalNumberEntry(journalNumber=47, postingNumber=47)))


2016-09-19 18:08:30.579 DEBUG 3587 --- [Task-Executor-2] o.a.commandhandling.SimpleCommandBus     : Dispatching command [com.journal.ledger.api.journal.command.AddJournalNumber]


2016-09-19 18:08:30.579 DEBUG 3587 --- [Task-Executor-3] c.j.l.d.j.aggregate.RequestNumberSaga    : send AddPostingNumber command


2016-09-19 18:08:30.579 DEBUG 3587 --- [Task-Executor-2] o.a.unitofwork.NestableUnitOfWork        : Starting Unit Of Work.


2016-09-19 18:08:30.579 DEBUG 3587 --- [Task-Executor-1] o.a.commandhandling.SimpleCommandBus     : Dispatching command [com.journal.ledger.api.journal.command.AddPostingNumber]


2016-09-19 18:08:30.579 DEBUG 3587 --- [Task-Executor-1] o.a.unitofwork.NestableUnitOfWork        : Starting Unit Of Work.


2016-09-19 18:08:30.580 DEBUG 3587 --- [Task-Executor-1] o.a.unitofwork.NestableUnitOfWork        : Registering Unit Of Work as CurrentUnitOfWork


2016-09-19 18:08:30.580 DEBUG 3587 --- [Task-Executor-2] o.a.unitofwork.NestableUnitOfWork        : Registering Unit Of Work as CurrentUnitOfWork


2016-09-19 18:08:30.580 DEBUG 3587 --- [Task-Executor-1] o.a.u.UnitOfWorkListenerCollection       : Registering listener: com.journal.foundation.axon.interceptor.correlation.CorrelationUnitOfWorkListener


2016-09-19 18:08:30.580 DEBUG 3587 --- [Task-Executor-2] o.a.u.UnitOfWorkListenerCollection       : Registering listener: com.journal.foundation.axon.interceptor.correlation.CorrelationUnitOfWorkListener


2016-09-19 18:08:30.580 DEBUG 3587 --- [Task-Executor-1] o.a.u.UnitOfWorkListenerCollection       : Registering listener: org.axonframework.auditing.AuditingUnitOfWorkListener


2016-09-19 18:08:30.580 DEBUG 3587 --- [Task-Executor-2] o.a.u.UnitOfWorkListenerCollection       : Registering listener: org.axonframework.auditing.AuditingUnitOfWorkListener


2016-09-19 18:08:30.581  INFO 3587 --- [Task-Executor-1] o.a.c.interceptors.LoggingInterceptor    : Incoming command: [AddPostingNumber]


2016-09-19 18:08:30.581  INFO 3587 --- [Task-Executor-2] o.a.c.interceptors.LoggingInterceptor    : Incoming command: [AddJournalNumber]


2016-09-19 18:08:30.581 DEBUG 3587 --- [Task-Executor-1] com.journal.config.aop.LoggingAspect     : Enter on 38: com.journal.ledger.domain.journal.aggregate.JournalCommandHandler.onAddPostingNumber() with argument[s] = [{"journalId":"7fdc974b-b0c8-4320-97f3-64427c81ec7d","companyId":5,"postingNumber":47}]


2016-09-19 18:08:30.581 DEBUG 3587 --- [Task-Executor-2] com.journal.config.aop.LoggingAspect     : Enter on 39: com.journal.ledger.domain.journal.aggregate.JournalCommandHandler.onAddJournalNumber() with argument[s] = [{"journalId":"7fdc974b-b0c8-4320-97f3-64427c81ec7d","companyId":5,"journalNumber":47}]


2016-09-19 18:08:30.581 DEBUG 3587 --- [Task-Executor-1] o.a.u.UnitOfWorkListenerCollection       : Registering listener: org.axonframework.common.jdbc.UnitOfWorkAwareConnectionProviderWrapper


Dispatching command [com.journal.ledger.api.journal.command.AddJournalNumber]
Dispatching command [com.journal.ledger.api.journal.command.AddPostingNumber]

but executed 

Incoming command: [AddPostingNumber]
Incoming command: [AddJournalNumber]


is there any config for make it in good order ?


Allard Buijze

unread,
Sep 19, 2016, 9:01:45 AM9/19/16
to Axon Framework Users
Hi,

The async command bus doesn't guarantee ordering, indeed. To make sure a command is handled after another, dispatch it in the 'onSuccess' invocation of the callback of the first command.

Cheers,

Allard
--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Message has been deleted

Meong Meong

unread,
Sep 19, 2016, 10:48:56 PM9/19/16
to Axon Framework Users
Dear Allard,

tq for guideline.. i'm changed saga to use callback on dispatch command for same aggregateid..

anw for concurent assec on aggregate with async command bus, Pessimistic Locking is already on repository right  as (EventSourcingRepository is extended of LockingRepository) 

Allard Buijze

unread,
Sep 19, 2016, 11:15:06 PM9/19/16
to Axon Framework Users
Hi,

yes, all access to aggregates using the Axon-provided repositories is guarded by a pessimistic lock (by default). So you do not need to worry about concurrent access there.

Cheers,

Allard

Meong Meong

unread,
Sep 20, 2016, 5:05:14 AM9/20/16
to Axon Framework Users
dear Allard,

is this applied on concurent access to saga with same sagaId to ?

Allard Buijze

unread,
Sep 20, 2016, 8:32:38 AM9/20/16
to Axon Framework Users
Yes, it is.

Meong Meong

unread,
Sep 22, 2016, 1:21:08 AM9/22/16
to Axon Framework Users
nicee :D...

tq allard :)
Reply all
Reply to author
Forward
0 new messages