Distributed Command bus design implementations

161 views
Skip to first unread message

Bruno Mendonça

unread,
Sep 21, 2017, 11:03:29 PM9/21/17
to Axon Framework Users
I have created a few services using Axon 3 that run in AWS. Up until this point there was no necessity for these services to communicate with each other. Therefore, upon receiving a request the controller endpoint sends a command, which is either handled by an aggregate command handler or by a command handler class that will load the aggregate. In either case, the aggregate will apply the corresponding event. These events will be handled in a query event handler class that will persist to the query repository. In some scenarios there is also an amqp handler that will send the event to an exchange that a legacy application (not service) is subscribing to.

I am now looking at designing a solution that would allow 2 of these services to communicate.

Scenario: Service A receives a request, sends a command that is handled internally (same JVM), and an event is applied. A subset of the payload of the request needs to be sent to service B. Service A needs to know if service B handled it's piece or not (whatever the not reason may be... potentially unreachable) and in case of failure A sends a command to "rollback" (apply an event). In a successful request, service B will internally apply it's own event (not sure if as a result of handling a distributed command or if it receives from A the actual event it needs to apply...).

I've been reading about Distributed Command Bus, but I've also read comments here and there on this google group about distributed Events.

1. Should I be distributing Commands or Events?

2. My lead architect prefers an approach that uses AMQP as the vehicle of communication, mainly to account for the fact that Service B may be unavailable. I personally don't love this approach, because since I need a response, then Service B needs to send another message to another queue that service A needs to subscribe to. Unavailability should be VERY rare anyways, but a response is always required in case of some internal failure either from business validation or an unexpected exception happening in service B. Another thing I don't like about this approach is, assuming B was down for a few hours and the resumed, picked up the amqp message and a failure was still generated internally and then communicated back to A for a rollback, clients accessing our application would see their changes during those 4 hours and then all of a sudden they'd be gone because they were "rolledback".

3. I prefer an approach where services communicate directly. If service B is unreachable, again VERY rare, then service A will "rollback" it's internal applied event, and the client won't even notice. In case B is up then we also get direct response if something goes wrong and "rollback" immediately, and the client won't even notice. Even the number of characters I had to write to explain this approach are less...

4. I have read about JGroups and Spring Cloud Connector and briefly looked at one example that seems to use amqp in some way for the communication. I am leaning towards Spring Cloud Connector (since we already have an Eureka service running in AWS), but haven't had time to play with it and haven't seen any demo using it.

In some cases the inter service communication will be used to provide eventual consistency of data, but in other cases it will be a way of triggering some processing that is handled by a different service.
MongoDB is used as the Event Store and the Query repo.

Really looking for some direction on what design approach to implement.

Thanks,
Bruno

Bruno Mendonça

unread,
Sep 28, 2017, 11:19:01 AM9/28/17
to Axon Framework Users
No takers? :)

Benoît Liessens

unread,
Sep 30, 2017, 3:53:47 AM9/30/17
to Axon Framework Users
HI Bruno,

The purpose of distributing commands is to scale a service horizontally: e.g. have multiple JVMs with the same service, each JVM handling a percentage of the commands (and events).
Your intent is rather to maintain consistency across different aggregates and (in worst) case roll back to a previous (consistent) state. This is exactly what Sagas are designed for. 

Basically your would design a Saga (associated with the aggregate of) service A.Upon emitting event E1 containing the data needed by B, the Saga is triggered and would then communicate this information with B (eg. via REST call). If the call to B fails, the Saga must the rollback: e.g. send a command to the aggregate of service A to revert the action leading to event E1.
Keep in mind, a Saga stateful. Eaxch aggregate in service A will have it's own dedicated Saga instance.

This resembles your approach of "direct communication". You don't need AMQP or JGroups for this.

Cheers,
Benoît 

Gopal, Brijesh

unread,
Sep 30, 2017, 7:21:26 AM9/30/17
to axonfr...@googlegroups.com
Hi Benoit,

Since these are stateful and long running transactions, do we need to worry about the scalability factor while using sagas? More specifically if let us say there are 100K transactions that my Saga needs to handle at any given time, any recommendations or design principles that we should keep in mind?

Thanks
Brijesh

--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Steven Grimm

unread,
Sep 30, 2017, 1:01:59 PM9/30/17
to axonfr...@googlegroups.com
In terms of designing sagas for best performance, I've found the main thing is to keep them small. This is usually pretty easy for a saga that's managing the operation of a single aggregate like Benoit describes, less easy if the saga needs to coordinate the activity of a number of aggregates. Whenever possible, prefer lots of little sagas over a small number of big ones.

Sometimes you really do need big sagas that coordinate activity across a bunch of otherwise independent aggregates. In my experience, getting good performance out of a big saga boils down to storing as little data as possible. Just what it needs to do its work, no more. Any non-transient fields in the saga class need to be serialized and deserialized for every event, at least in the worst case, and you don't want to waste CPU cycles decoding and encoding data you'll never need. Don't store each aggregate's full details in the saga if you can do everything you need with just the aggregate ID, for example.

One not-always-obvious thing to watch out for is that data requirements can change over the life of a saga. You might need a particular chunk of data at the beginning of a workflow but not later. In that case it can be beneficial to release the saga's copy of the data once it's been used, e.g., by removing it from a hashmap or nulling out the field that's holding a reference.

Profile first. You don't want to spend effort optimizing something that is not a significant bottleneck.

-Steve


September 30, 2017 at 12:53 AM

Bruno Mendonça

unread,
Oct 11, 2017, 9:40:08 PM10/11/17
to axonfr...@googlegroups.com
Thanks for the insightful information Benoît and Steve.

As I was formulating my initial question I already had in mind that I would need to use a Saga. Although, I didn't want to clutter my already long question with more information.

So indeed I was planning on this situation:

1. Service A receives a request on a particular endpoint
2. Endpoint in A sends a CreateCommand
3. Aggregate X in A handles the CreateCommand and applies a CreatedEvent
4. Saga in A handles the CreatedEvent

From here on is where I am trying to put the pieces together. Unfortunately, the last 2 weeks I've been busy with things I didn't want to be :) and haven't had the time to experiment. So here are the steps I'm not so sure of.

5. Saga in A sends DistributedCommand
6. Aggregate in service B handles the DistributedCommand and applies a SomethingHappenedEvent

Assuming this reasoning is correct, how does the DistributedCommand go from A to B? Benoît hinted at that "(eg. via REST call)". But would Axon be responsible for that call? Or that would have to be implemented through code?


To unsubscribe from this group and stop receiving emails from it, send an email to axonframework+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the Google Groups "Axon Framework Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/axonframework/Uol4hw_7oXQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to axonframework+unsubscribe@googlegroups.com.

Benoît Liessens

unread,
Oct 12, 2017, 2:27:29 AM10/12/17
to Axon Framework Users
Hi Bruno,

Your DistributedCommand boils down to:
5a the Saga calls an endpoint of Service B
5b the endpoint of service B translates that a Command B targetting the aggregate of service B

If the Command B is valid, the endpoint of Service B should return a ‘OK’ answer. (Eg http 200 for rest calls)
Else, it should indicate a failure

6 the saga acts upon the response of endpoint Of service B. If case of failure the saga could then either rollback to a previous state or possibly schedule an event to retry steps 5a and 5b.

Note: is the scenario above the event processing in the saga and the remote call happen in the same thread. This could reduce the throughput (not sure if that is a problem). The benefit is that complexity is limited because the remote call is synchronuous and the saga can immedialty act upon the response

Cheers,
Benoît

Bruno Mendonça

unread,
Oct 12, 2017, 4:57:29 PM10/12/17
to Axon Framework Users
Hi Benoît,

Ok, so I see we would have to programmatically call endpoint B from A's Saga.

Would it be a decent approach to have SomethingToDoInBCommand in a common lib shared by A and B, so that A would send the serialized command in the body of the API request to B and Spring would take care of deserializing into SomethingToDoInBCommand in B?

Endpoint in B:

@RequestMapping(path = "/someEndpoint", method = RequestMethod.POST)
@ResponseStatus(HttpStatus.OK)
public Callable<SomeResult> doSomething(
@RequestBody SomethingToDoInBCommand cmd) {
    return () -> {
        ...

        SomeResult result = doSomething(cmd);

...


return result ;
};
}

Benoît Liessens

unread,
Oct 13, 2017, 5:42:37 AM10/13/17
to Axon Framework Users
Hi Bruno,

You could. But do realise this implies the domain model of service B is leaking (service A knows about the exact command of another system). When using REST endpoints I would rather suggest to map this command to a meaning full URI. eg. ../aggregate-collection/{aggregate-id}/action with request or body parameters.

Cheers,
Benoît 
Reply all
Reply to author
Forward
0 new messages