Implicitly creating aggregate roots

499 views
Skip to first unread message

Ye Thumok

unread,
May 19, 2016, 3:30:41 AM5/19/16
to Axon Framework Users
Hello,

I'm faced with an odd problem where I have to implicitly create an aggregate root and am looking for a nice solution.

The problem I face is that I receive events from an external system which may (and in fact does) send events in an undefined order. I can calculate a type-5 UUID for the aggregate root, however when handling the external event I do not know if the aggregate root already exists or not. Ideally I would use a factory in the AxonFramework to create the aggregate root using a command-handling constructor that only accepts the UUID and whenever a command to handle the external event is processed.

What would you suggest that I should try. I've been thinking about the following options: 
- 1) using read model to keep track of which aggregate roots have already created and before sending the command for the external event create the aggregate root using a creational command
- 2) using an intercepter to modify the command (if that's even possible, I don't know if I can access the repository in an interceptor, I haven't tried that)
- 3) use an "external" command handler (i.e. one that is not part of the aggregate root) and try to load the aggregate root when handling the command, if it does not exist then manually add a fresh aggregate root, add the creation event and then add the event for the external command.

For me solution 3) looks most appealing since 1) relies on the caller knowing about internal state of the system. About number 2) I'm not sure if that's even possible when switching to a command bus implementation that is distributed. As for number 3), that's the way I wanted to go, however I'm not sure how I'd really have to implement the command handler using only the UnitOfWork as an interface to the AxonFramework. Any tips here?

Thanks for any ideas or input! :)
Thumok

Allard Buijze

unread,
May 19, 2016, 5:50:14 AM5/19/16
to Axon Framework Users
Hi,

there is a fourth option, which is useful when the updates far outnumber the create situations:
send an update command. If it fails because the aggregate doesn't exist, send a create command, followed by the update command that previously failed.

Cheers,

Allard

--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ye Thumok

unread,
May 19, 2016, 6:09:33 AM5/19/16
to Axon Framework Users
Hi Allard

Thanks for the fast reply! :)

Mhm, your fourth option is along the lines of option number 1. However, the problem with this option as I see it is that everyone that dispatches commands is "burdened" with the knowledge about the inner workings of the AxonFramework and the way the aggregate root gets created. I'd rather have that code hidden from the user of the command gateway.

I'll prepare a little demo of what I got running with an external command handler and would be grateful if you could comment on it.

Ye Thumok

unread,
May 19, 2016, 6:13:48 AM5/19/16
to Axon Framework Users
Here's a (hopefully minimal) demo of the way I try to tackle it with an external command handler. Any comments on whether this makes sense or not are greatly appreciated!

As you can see, I use a static inner class that serves as the command handler so that I can access the constructor and the receiveMessage methods which apply the correct events. The "black magic" part is where I catch the AggregateNotFoundException in the command handler to manually create the DemoAggregateRoot and add it to the repository. 

Given the unit test at the bottom it seems to work, but I'm not sure if I'm using the AxonFramework correctly.

public class DemoAggregateRoot extends AbstractAnnotatedAggregateRoot<UUID> {

@AggregateIdentifier
private UUID id;

private List<Message> messages = new ArrayList<>();

protected DemoAggregateRoot() {
super();
}

private DemoAggregateRoot(UUID id) {
apply(new DemoAggregateRootCreatedEvent(id));
}

private void receiveMessage(Message message) {
apply(new MessageReceivedEvent(id, message));
}

@EventHandler
public void on(DemoAggregateRootCreatedEvent event) {
id = event.id;
}

@EventHandler
public void on(MessageReceivedEvent event) {
messages.add(event.message);
}

public static class Handler {

private Repository<DemoAggregateRoot> repository;

void setRepository(Repository<DemoAggregateRoot> repository) {
this.repository = repository;
}

@CommandHandler
public void receiveMessage(ReceiveMessageCommand command) {
DemoAggregateRoot demo = loadOrCreate(command.id);
demo.receiveMessage(command.message);
}

private DemoAggregateRoot loadOrCreate(UUID uuid) {
try {
return repository.load(uuid);
} catch (AggregateNotFoundException e) {
DemoAggregateRoot demo = new DemoAggregateRoot(uuid);
repository.add(demo);
return demo;
}
}

}

}

public class DemoAggregateRootCreatedEvent {

public final UUID id;

public DemoAggregateRootCreatedEvent(UUID id) {
this.id = id;
}

}

public class Message {

public final String dummyPayload;

public Message(String dummyPayload) {
this.dummyPayload = dummyPayload;
}

}

public class MessageReceivedEvent {

public final UUID id;

public final Message message;

public MessageReceivedEvent(UUID id, Message message) {
this.id = id;
this.message = message;
}

}

public class ReceiveMessageCommand {

@TargetAggregateIdentifier
public final UUID id;

public final Message message;

public ReceiveMessageCommand(UUID id, Message message) {
this.id = id;
this.message = message;
}

}

public class DemoAggregateRootTest {

private FixtureConfiguration<DemoAggregateRoot> fixture;
private UUID id;

@Before
public void setUp() {
fixture = Fixtures.newGivenWhenThenFixture(DemoAggregateRoot.class);
DemoAggregateRoot.Handler commandHandler = new DemoAggregateRoot.Handler();
commandHandler.setRepository(fixture.getRepository());
fixture.registerAnnotatedCommandHandler(commandHandler);
id = randomUUID();
}

@Test
public void testImplicitCreate() {
Message message = new Message("payload");
fixture.given()
.when(new ReceiveMessageCommand(id, message))
.expectEvents(
new DemoAggregateRootCreatedEvent(id),
new MessageReceivedEvent(id, message)
);
}

@Test
public void testCreateCalledOnlyOnce() {
Message message1 = new Message("one");
Message message2 = new Message("two");
fixture.givenCommands(new ReceiveMessageCommand(id, message1))
.when(new ReceiveMessageCommand(id, message2))
.expectEvents(
new MessageReceivedEvent(id, message2)
);
}

}

Allard Buijze

unread,
May 19, 2016, 6:26:31 AM5/19/16
to Axon Framework Users
Hi,

this approach should work fine. The only downside to this, is that you must use @CommandHandlers outside of the aggregate.That may not be a problem, depending on the number of commands you expect your aggregate to handle.

You can also create a CommandGateway with a RetryScheduler that does a "create and retry" operation when a command fails with an AggreateNotFound exception.

Al options have their pros and cons, unfortunately.

Cheers,

Allard

--
Message has been deleted

Allard Buijze

unread,
May 25, 2016, 3:43:58 PM5/25/16
to Axon Framework Users
Hi Steve,

thanks for sharing! The changes in Axon 3 that we're currently making might just allow for this to become a bit easier to implement. A Unit of Work that acts on an already existing transaction could set a savepoint, to which it rolls back when a Unit of Work fails....

Enough room for improvement, still....

Allard

On Tue, May 24, 2016 at 9:01 PM Steven Grimm <kor...@midwinter.com> wrote:
I've had one issue with the "try a command then recover" approach that's worth pointing out, though it may be a PostgreSQL-specific behavior. When a SQL query returns an error like a unique constraint violation, PostgreSQL sets an error flag on the transaction and won't accept any subsequent write operations. You can work around this by using savepoints, which are sort of like lightweight nested transactions: create a savepoint, try the command, and if it fails, restore the savepoint (which clears the error flag) and proceed with the alternate code path. If the operation succeeds there's no need to do anything special with the savepoint; it's purely a bit of temporary state that goes away once the surrounding transaction is committed or rolled back.

This isn't something unique to Axon; I've had to do the same thing in traditional CRUD apps that needed to do speculative inserts of objects that might already exist. In an Axon application you'll need to make sure you are using the same transaction as the current UnitOfWork but that's not difficult and is well-described in the Axon docs.

-Steve 
Reply all
Reply to author
Forward
0 new messages