Sending Events to Event Bus

1,513 views
Skip to first unread message

Sally

unread,
Mar 10, 2015, 10:19:19 PM3/10/15
to axonfr...@googlegroups.com
Hello Everyone

I am very new to AXON and CQRS world. I used the quick guide to see how AXON works and I am able to successfully run it on my machine.

I see that after setting up the infrastructure configuration, AXON automatically store the events on the persistence storage and publish events on the Event Bus.

Since I want to be sure for (Persistence and Publish ) of events as atomic i.e. either both are Successful or both fail , 
I would like to first store the event in the Persistence storage and once it is stored , I want that event to publish it on the event bus.

For doing this , I need to have control on when and how these events are persisted and published by AXON.

Also, I am thinking of running the background thread which listens to the Persistence storage , once event get stored , thread will publish the event on the
event bus and then thread will mark the event in the event store as Published. 

HELP !!!


Allard Buijze

unread,
Mar 11, 2015, 4:19:11 AM3/11/15
to Axon Framework Users
Hi Sally,

what you're describing, is actually the default Axon behavior. Axon will always first store the events in the event store, and publish them to the bus afterwards.
If you attach a transaction manager to the unit of work, everything is dealt with transactionally as well.

If you really must customize this part, you'll need to create your own UnitOfWork and configure a UnitOfWorkFactory. It's not a road I'd recommend, though.

Cheers,

Allard

--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Abhishek Biswas

unread,
Mar 11, 2015, 5:42:35 AM3/11/15
to axonfr...@googlegroups.com
Hi Allard,

Thanks for your reply. Does it mean that we need to associate a TransactionManager to the command bus as shown below?

 <axon:command-bus id="commandBus" transaction-manager="txManager"/>

Thanks | Abhishek

Allard Buijze

unread,
Mar 11, 2015, 6:24:12 AM3/11/15
to axonfr...@googlegroups.com
Yes, that is correct.

Cheers, Allard 

Abhishek Biswas

unread,
Mar 11, 2015, 6:56:25 AM3/11/15
to axonfr...@googlegroups.com
And one more question:

how would I configure a transaction manager for the same. I tried this but no luck.

<axon:command-bus id="commandBus" transaction-manager="transactionManager"/>
<bean id="transactionManager" class="org.axonframework.unitofwork.SpringTransactionManager"/>

Thanks | Abhishek

Abhishek Biswas

unread,
Mar 11, 2015, 7:07:25 AM3/11/15
to axonfr...@googlegroups.com
By the way, I am using mongodb as the event store and by spring config looks like this:
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    <axon:command-bus id="commandBus" transaction-manager="transactionManager"/>
    <bean id="transactionManager" class="org.axonframework.unitofwork.SpringTransactionManager"/>
    
    <axon:event-bus id="eventBus"/>
    

    <axon:event-sourcing-repository id="customerRepository"
                                    aggregate-type="org.abhishek.axon.aggregates.Customer">

        <axon:snapshotter-trigger event-count-threshold="2" snapshotter-ref="snapshotter"/>
    </axon:event-sourcing-repository>

    <bean id="snapshotter" class="org.axonframework.eventsourcing.SpringAggregateSnapshotter">
        <property name="eventStore" ref="eventStore"/>
        <property name="executor" ref="taskExecutor"/>
    </bean>

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="2"/>
        <property name="maxPoolSize" value="2"/>
        <property name="waitForTasksToCompleteOnShutdown" value="true"/>
    </bean>

    <axon:aggregate-command-handler id="customerHandler"
                                    aggregate-type="org.abhishek.axon.aggregates.Customer"
                                    repository="customerRepository"
                                    command-bus="commandBus"/>

    <bean class="org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean">
        <property name="commandBus" ref="commandBus"/>
    </bean>

    <bean class="org.abhishek.axon.eventhandlers.ExternalCustomerEventHandler"/>

    <!-- mongodb specific event store config-->
    <bean id="eventStore" class="org.axonframework.eventstore.mongo.MongoEventStore">
        <constructor-arg ref="mongoTemplate"/>
    </bean>

    <bean id="mongoTemplate" class="org.axonframework.eventstore.mongo.DefaultMongoTemplate">
        <constructor-arg index="0" ref="mongo"/>
        <constructor-arg index="1" value="customers"/>
        <constructor-arg index="2" value="domainevents"/>
        <constructor-arg index="3" value="snapshotevents"/>
        <constructor-arg index="4">
            <null/>
        </constructor-arg>
        <constructor-arg index="5">
            <null/>
        </constructor-arg>
    </bean>

    <mongo:mongo id="mongo" host="127.0.0.1" port="27017"/>

</beans>
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Thanks | Abhishek

Allard Buijze

unread,
Mar 11, 2015, 11:07:25 AM3/11/15
to Axon Framework Users
Hi,

the transaction-manager atrtibute should point to a Spring PlatformTransactionManager. There is no need to wrap it in a SpringTransactionManager explicitly.

However, as far as I know, Mongo doesn't support transactions. So there is no point configuring a transaction manager.

Cheers,

Allard

Abhishek Biswas

unread,
Mar 11, 2015, 3:59:36 PM3/11/15
to axonfr...@googlegroups.com
Hi Allard,
 
This really helps! However, I have couple of more questions before we decide what would be best-suited for our requirements.

  1. I read the Axon reference document and figured that UnitOfWork is not transactional. So, if there are five actions in an UnitOfWork and when that UnitOfWork proceeds to commit, there is no guarantee that all five actions will be persisted in the event store. So, if three of them are persisted in the event store before the UnitOfWork fails, those three would not be rolled back  and three events corresponding to three actions persisted in the event store would be published in the event bus. Is this understanding correct? And to make sure that all five or none of the actions persist in the event store, we have to configure a transaction manager for the command bus. And this technique would only work if the event store supports transaction. Please tell me if this understanding is correct as well.
  2. Does Axon framework give a guarantee that if an UnitOfWork is committed then all the relevant events would be published in the event bus? If not, what can we do to ensure it?
  3. If we still decide to use MongoDB as the event store, we must ensure one of the two to guarantee that an UnitOfWork would still be transactional. Is this understanding correct?
    • There should be one action in one UnitOfWork since one document storage is atomic for MongoDB.
    • If there are multiple actions in one UnitOfWork, we have to configure in a way so that all the relevant events would be persisted on a single document in MongoDB. Do this approach has any negative impact on event replaying to form an aggregate?
 Sorry for such a big mail.

Thanks | Abhishek
 

Sahil Rally

unread,
Mar 12, 2015, 1:59:06 AM3/12/15
to axonfr...@googlegroups.com
Hi Allard

We want to guarantee the complete success or complete failure of the following 2 actions.

1. Events gets stored in MongoDB .  
2. Events are published on the Event Bus.

What should we do exactly? 

The following things should never happen in the system

1. All Events associated with a Command gets Stored  but are not Published on the Event Bus.
2. Some events associated with a Command gets Stored and Unit of Work fails.

In the above failure Scenarios , we want to Roll back everything and try again.


Also, very curious to know that the above scenario is very crucial and basic to every system , why it is not included in the AXON framework to be guaranteed. Any particular reasons for this.?

Looking Forward for the Solution. 

Thanks

Sally



Allard Buijze

unread,
Mar 14, 2015, 4:15:46 PM3/14/15
to axonfr...@googlegroups.com
Hi Sally,

the issue isn't that Axon cannot guarantee this, but that Mongo just doesn't support transactions. You can configure a transaction manager on a unit of work that can give you the necessary guarantees (for another dbms).

Axon allows for a Document-per-commit (which stores all events for a single uow in one document) to be able to ensure either all or none of the events are stored.

Cheers,

Allard




Sally

unread,
Mar 15, 2015, 11:25:53 AM3/15/15
to axonfr...@googlegroups.com
Hello Allard

Thanks for your reply. We have done the following 

1. We have configured Transaction Manager with the Command Bus. 
2. We have registered Unit Of Work Listeners too.
3. Now while testing, when we switch off Event Bus , we get callbacks both in Transaction Manager as well as UnitOfWork Listener.   OnRollback callback is called for both
    Transaction Manager as well as Unit Of Work. What is the difference ?

Also, we are planning to delete the documents from the mongodb once onRoolback is called, (to ensure atomicity of UOW ), is this strategy right ?

Cheers!!!
Sally 

Allard Buijze

unread,
Mar 15, 2015, 4:24:23 PM3/15/15
to axonfr...@googlegroups.com
Hi,

how do you mean 'We get callbacks both in tm and uow'? I am only aware of callbacks on the uow. The transaction manager is just to manage the tx. The uow takes care of the different actions (including invoking the transaction manager) to take and their timing. 

Removing the document on rollback sounds reasonable. 

Cheers, Allard 




Sally

unread,
Mar 19, 2015, 4:46:03 AM3/19/15
to axonfr...@googlegroups.com
Hello Allard 

Here goes our implementation. Could you please validate.

public class Employee extends AbstractAnnotatedAggregateRoot {

    @AggregateIdentifier
    private String id;

    private String name;
    private String email;
    private Address address;


    public Employee(){
    }

    @CommandHandler
    public Employee(CreateEmployeeCommand command, UnitOfWork unitOfWork){
        registerUnitOfWorkListener(unitOfWork);
        apply(new EmployeeCreatedEvent(command.getId(), command.getName(), command.getEmail(), command.getAddress()));
    }

    @CommandHandler
    public void changeName(ChangeEmployeeNameCommand command, UnitOfWork unitOfWork){
        registerUnitOfWorkListener(unitOfWork);
        apply(new EmployeeNameChangedEvent(command.getId(), command.getName()));
     }


private void registerUnitOfWorkListener(UnitOfWork unitOfWork) {
        unitOfWork.registerListener(new UnitOfWorkListenerAdapter() {

            private List<String> eventIdentifiers = new ArrayList<String>();
            
          
            @Override
            public void onPrepareCommit(UnitOfWork unitOfWork, Set<AggregateRoot> aggregateRoots, List<EventMessage> events) {
                
                for(EventMessage e :  events){
                eventIdentifiers.add(e.getIdentifier());
                }

                super.onPrepareCommit(unitOfWork, aggregateRoots, events);
            }

           
            @Override
            public void onRollback(UnitOfWork unitOfWork, Throwable failureCause) {
                
                for(String id: eventIdentifiers) {
                    DBObject document =  App.customersDb.getCollection("domainevents").findOne();
                    App.customersDb.getCollection("domainevents").remove(document);
                }

                super.onRollback(unitOfWork, failureCause);
            }

            
        });
    }

}

Allard Buijze

unread,
Mar 20, 2015, 3:28:48 PM3/20/15
to Axon Framework Users
Hi Sally,

the general direction of the solution looks all right. However, if you want to perform some activity on each incoming command, it is easier/better to use a CommandHandlerInterceptor.instead. It takes the "infrastructure logic" our of your aggregate, which should focus on business logic.

You can register your CommandHandlerInterceptor on the Command Bus implementation.

Cheers,

Allard

Sally

unread,
Mar 25, 2015, 8:03:56 AM3/25/15
to axonfr...@googlegroups.com
Hello Allard

Thanks for the suggestion, we have plugged in Interceptor and doing registration of unit of work there which works for all commands.

Query :  Since a command can spit multiple events, and it might happen that some events got published on event bus and subsequently Event-Bus goes off , consequently ,UnitOfWork Listener rollback is called , how could we understand which events are successfully published on event bus and which are not as we are 
getting all the events associated with the Unit Of Work,
We tried associating ExceptionListener with ActiveMQConnectionFactory so as to  get the error notification. BUt the callback is not called when the queue is shut off.
Are their any callbacks/listeners() which are notified when a event is published on event bus? Is there any way of knowing this info from events in onRollback()?

Allard Buijze

unread,
Mar 25, 2015, 2:36:04 PM3/25/15
to axonfr...@googlegroups.com
Hi Sally,

some message brokers support (light weight) transactions. You can use those when you need to send more than one message.

The callbacks on the UnitOfWork don't provide this levelof detailed information.

Cheers,

Allard

Sally

unread,
Mar 26, 2015, 4:53:12 AM3/26/15
to axonfr...@googlegroups.com
Hi Allard

In the scenario, when all of the events pertaining to one command are not published into the event bus, we want to retry the "non-published" events to event bus 
until they got published. 

Is there any scheduler built-in Axon which we can exploit ?

Thanks !!!

Allard Buijze

unread,
Mar 26, 2015, 6:51:36 AM3/26/15
to Axon Framework Users
There are some schedulers in the AsynchronousCluster, although I am not sure you'd want to use those in your scenario. You're probably best of building something yourself.

Cheers,

Allard

juya Kim

unread,
Dec 5, 2018, 11:11:11 AM12/5/18
to Axon Framework Users
Is there improved feature in higher version of Axon?
we do want to rollback a document stored in EventStore.
while I was debugging, 
Before calling AggregateLifeCycle.apply() if eventbus stopped or crached, the event is still saved in the eventStore(MongoDB)


2015년 3월 26일 목요일 오전 11시 51분 36초 UTC+1, Allard Buijze 님의 말:

Steven van Beelen

unread,
Dec 13, 2018, 10:33:56 AM12/13/18
to Axon Framework Users
Hi Juya,

First off, if you're still in the pre-production phase, I'd suggest using AxonServer or otherwise a regular Relational Database to store your events.
There are off chances that the MongoEventStorageEngine creates undesired situations in regards to trying to store big numbers of events concurrently.
To give a very short explanation on this, this happens because for Mongo the framework does not use a generated global index for ensured event ordering, by the timestamp.

A part from that though, I think the issue you're having stems from the Transaction Manager being used.
Make sure that the same Transaction Manager as your Mongo instance uses is set on the CommandBus you're using.

Hope this helps!

Cheers,
Steven

Steven van Beelen

unread,
Dec 14, 2018, 4:30:45 AM12/14/18
to Axon Framework Users
Hi Juya,

I'd like to add something to my prior response, which for some reason I missed when answering your question...
Mongo only recently started supporting transactions.
Subsequently, the MongoEventStorageEngine doesn't support those yet.
Therefore, once events are stored, they remain unaffected by a rollback, like the situation you've encountered..

Cheers,
Steven
Reply all
Reply to author
Forward
0 new messages