Race condition between saga and command using DistributedCommandBus

130 views
Skip to first unread message

Steven Grimm

unread,
May 11, 2017, 12:54:18 PM5/11/17
to Axon Framework Users
I recently started trying the JGroupsConnector to spread my application across multiple hosts. It mostly works out of the box which is great, but I did run into one problem that others might also run into and I figure I should describe how I fixed it. This is on Axon 2, but I suspect the same situation could happen on Axon 3 with subscribing event processors. (Maybe not with tracking processors, though.)

The problem manifests itself when you have saga code like

private void startSomeWork(CreateMyAggregateCommand command) {
  associateWith
("aggregateId", command.getAggregateId());
  commandGateway
.dispatch(command);
}

@SagaEventHandler(associationProperty="aggregateId")
public void on(MyAggregateCreatedEvent event) {
 
// do the next thing in the saga's workflow
}

This works fine most of the time, but occasionally, when the timing is just right, the following happens:

1. Saga on host A dispatches the creation command
2. Creation command gets routed to host B
3. Aggregate on host B publishes the MyAggregateCreatedEvent
4. Saga is written to database on host A

The problem is that at the time the event is published on host B, the fact that the saga is associated with the aggregate ID hasn't hit the database yet. So the event bus on host B thinks nothing is interested in the event, and the saga event handler is never invoked.

Right now I'm fixing it like this:

private void startSomeWork(CreateMyAggregateCommand command) {
  associateWith
("aggregateId", command.getAggregateId());

 
CurrentUnitOfWork.get().registerListener(new UnitOfWorkListenerAdapter() {
   
@Override
   
public void afterCommit(UnitOfWork unitOfWork) {
      commandGateway
.dispatch(command);
   
}
 
});
}

By deferring command dispatch until after the saga (with its associations) has been committed to the database, host B always sees the association and invokes the saga event handler without issues.

It's not necessary to wrap *all* commands like this, of course, just ones that occur in a unit of work where a saga's associations change in a way that affects its ability to receive whatever events result from the command.

Is this a sane approach, or is it a sign I'm doing something wrong?

-Steve

Allard Buijze

unread,
May 18, 2017, 4:23:23 PM5/18/17
to Axon Framework Users
Hi Steven,

yes, this actually a sane approach. In fact, I'm seriously considering using this mechanism 'built-in' in all the components that trigger async activity. That way, it will never happen that an async action is performed without the 'cause' if this activity to have been committed.

The only issue is that senders may use 'sendAndWait' semantics. These would then not work anymore.
For now, I think implementing this behavior on the command gateway is the only place where it can be implemented safely. The gateway would 'park' the dispatch until afterCommit and short circuit it when the sender is waiting for the result.

Cheers,

Allard

PS. Thanks a lot for sharing your issues and solutions! I'm sure they're very helpful to many in the community.
--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Steven Grimm

unread,
May 18, 2017, 5:07:25 PM5/18/17
to Axon Framework Users
I ended up deciding to make this an across-the-board thing because it's too error-prone to expect people to remember to do this in exactly the right places. So my command gateway is now actually a proxy:

  @Bean
 
public CommandGateway commandGateway(CommandBus commandBus) {
   
CommandGateway gateway =
       
new GatewayProxyFactory(commandBus).createGateway(CommandGateway.class);

   
return (CommandGateway) Proxy.newProxyInstance(
        getClass
().getClassLoader(),
       
new Class<?>[] { CommandGateway.class },
       
(proxy, method, args) -> {
         
// Commands that return a value or have declared exceptions are synchronous, so
         
// dispatch them immediately. Also dispatch immediately if there is no unit of work
         
// active, since there's therefore nothing waiting to be committed to the database.
         
if (method.getReturnType() != void.class || method.getExceptionTypes().length > 0 ||
             
!CurrentUnitOfWork.isStarted()) {
           
return method.invoke(gateway, args);
         
}

         
CurrentUnitOfWork.get().registerListener(new UnitOfWorkListenerAdapter() {
           
@Override
           
@SneakyThrows
           
public void afterCommit(UnitOfWork unitOfWork) {
              method
.invoke(gateway, args);
           
}
         
});

         
return null;
       
});
 
}

harinder singh

unread,
Dec 7, 2017, 1:34:59 AM12/7/17
to Axon Framework Users
Hi Allard,

I am facing some similar race condition between two commands.


Flow:-

1) I have an interceptor that calls SAGA to send a command using the gateway to aggregate root. This works fine when the request does not contain another command, so just the SAGA command is executed perfectly.

2) Now if I fire some request to my controller that will call some service to send some command over the gateway, issues arise. The request is still first intercepted by the interceptor but the order in which the commands now land on aggregate is first the service command, then the command from SAGA, which defeats the business purpose. If I see still the command is first dispatched by SAGA, and then by service layer but still the order in which they are received at aggregate root is opposite.


I am using axon 2.4.6 and using defaultCommandGateway and DistributedCommandBus.

Regards :
Harinder Singh

Allard Buijze

unread,
Dec 10, 2017, 2:14:44 PM12/10/17
to axonfr...@googlegroups.com
Hi,

my first inpression is that it feels awkward to invoke a Saga to send a command from an interceptor. Sagas are ‘designed’ to react to events. Not sure what you’re trying to achieve (funtionally), but there might be a cleaner way.

If you’re using a DistributedCommandBus, commands are (potentially) sent to other nodes, where they can be executed in parallel. ‘Race conditions’ here are normal.

Cheers,

Allard
--
Reply all
Reply to author
Forward
0 new messages