SagaEventHandler will not subscribe to SpringAMQPMessageSource

493 views
Skip to first unread message

Edison Xu

unread,
Apr 13, 2017, 7:08:25 AM4/13/17
to Axon Framework Users
Hi All,

In my application, I'm trying to make a Saga to handle a message from AMQP. 
But unfortunately, I found that it works fine if I annotate a @ProcessGroup("order") on a normal bean containing methods with @EventHandler, but if I cut the same annotation to a Saga, it's not working.
I've already define the mapping in the application.property.
axon.amqp.exchange=Axon.EventBus
axon.eventhandling.processors.order.source=queueMessageSource

@Configuration
public class AMQPConfiguration {

    private static final Logger LOGGER = getLogger(AMQPConfiguration.class);

    @Value("${axon.amqp.exchange}")
    private String exchangeName;

    @Bean
    public Queue queue(){
        return new Queue("orderqueue", true);
    }


    @Bean
    public Exchange exchange(){
        return ExchangeBuilder.fanoutExchange(exchangeName).durable(true).build();
    }

    @Bean
    public Binding queueBinding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("").noargs();
    }

    @Bean
    public SpringAMQPMessageSource queueMessageSource(Serializer serializer){
        return new SpringAMQPMessageSource(serializer){
            @RabbitListener(queues = "orderqueue")
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                LOGGER.debug("Message received: "+message.toString());
                super.onMessage(message, channel);
            }
        };
    }
}


The Saga class is not subscribing to SpringAMQPMessageSource. So the message will be ignored.

Is Axon not supporting saga to accept amqp message in this way, or I'm not doing the right way?
What's the proper way if I want Saga to handle the message from mq?

Edison Xu

unread,
Apr 14, 2017, 5:57:16 AM4/14/17
to Axon Framework Users
Anyone got idea about this? 

在 2017年4月13日星期四 UTC+8下午7:08:25,Edison Xu写道:

Allard Buijze

unread,
Apr 14, 2017, 7:50:53 AM4/14/17
to Axon Framework Users
Hi Edison,

The @ProcessingGroup works on Event Handlers, but not on Sagas. Up until 3.0.3, the Configuration API has a limitation that you cannot configure a different source of events than the Event Bus. This is fixed in 3.0.4, where you can create a SagaConfiguration bean (named as <sagaClassName> + Configuration) that defines the configuration for that specific Saga.

I expect to be able to release 3.0.4 today.

Cheers,

Allard

--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Edison Xu

unread,
Apr 16, 2017, 11:37:26 PM4/16/17
to Axon Framework Users
Hi Allard,

Thanks for the replay. 

Please correct me if I'm wrong. The concept Saga is used to handle the complex process, even in the distributed system. While in micro services architecture, the pieces of the process will be distributed into different micro services.
So Saga will have to be able to listen the events from different nodes. A mq can be used as the bridge between those nodes & saga.

Let's say we have two services, order service and product service. When booking a order, we need to reserve the product.
And for the load balance & HA consideration, we make two instances of both the services.
We implement a Saga in the OrderService to handle the book and reservation process,  using distribute command bus to send the reservation command to ProductService, and a Rabbit mq to pass the events.

When reservation of product is successful, a ReservationOkEvent is sent back to both instances of the OrderService. 
Since only the node which sent out the command previously has the instance of Saga, to make sure it will receive the events, we configure the queue as Fanout mode. 
So both nodes will get the events.

Here comes my questions:
1. Can axon guarantee only one Saga would handle the message correctly? (Would the Saga on the other node create a new instance to handle the event?)
2. Once we configured the distribution of Events, will the Saga read local events from the queue as well, or it will be notified by local event machnism? 

Looking forward your answer.

Thanks,

Edison


在 2017年4月14日星期五 UTC+8下午7:50:53,Allard Buijze写道:

Edison Xu

unread,
Apr 18, 2017, 2:00:06 AM4/18/17
to Axon Framework Users
Allard,

Tried in v3.0.4 to create a xxxSagaConfiguration, but failed with exception:

org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'orderController': Unsatisfied dependency expressed through field 'commandGateway'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.axonframework.spring.config.AxonConfiguration': Invocation of init method failed; nested exception is org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'orderSagaConfiguration' is expected to be of type 'org.axonframework.config.ModuleConfiguration' but was actually of type 'com.edi.learn.cloud.command.config.OrderSagaConfiguration$$EnhancerBySpringCGLIB$$7eddc99e'

Is anything wrong with my step or is it not available in 3.0.4? 


在 2017年4月14日星期五 UTC+8下午7:50:53,Allard Buijze写道:
Hi Edison,

Mehdi Elaoufir

unread,
Apr 18, 2017, 5:57:08 AM4/18/17
to axonfr...@googlegroups.com
Hi Edison,

I am new to Axon, i hope my answer helps (Axon experts, please correct me if i'm wrong):

The exception you've got is related to the return type of orderSagaConfiguration in your JavaConfig.

It is expected to be a org.axonframework.config.ModuleConfiguration, the closest implementation is the org.axonframework.config.SagaConfiguration class.

Cheers,

Mehdi

On Tue, Apr 18, 2017 at 7:00 AM, Edison Xu <xese...@gmail.com> wrote:
nested exception is org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'orderSagaConfiguration' is expected to be of type 'org.axonframework.config.ModuleConfiguration' but was actually of type 'com.edi.learn.cloud.command.config.OrderSagaConfiguration$$EnhancerBySpringCGLIB$7eddc99e'


Edison Xu

unread,
Apr 18, 2017, 8:52:58 PM4/18/17
to Axon Framework Users
Mehdi,

Yes, you're right. And that's exactly what happened. 
But what I'm trying to do is to verify the new parts mentioned by Allard "This is fixed in 3.0.4, where you can create a SagaConfiguration bean (named as <sagaClassName> + Configuration) that defines the configuration for that specific Saga".

Regards,

在 2017年4月18日星期二 UTC+8下午5:57:08,Mehdi Elaoufir写道:

Allard Buijze

unread,
Apr 19, 2017, 9:21:03 AM4/19/17
to Axon Framework Users
Hi Edison,

The saga configuration bean (in your case orderSagaConfiguration)  has to be of the correct type, namely SagaConfiguration. As Mehdi indicated, your application currently fails to start because the bean is not of the expected type.

I see you have an OrderSagaConfiguration class. Note that the default bean name of a configuration class is the simple class name with first lowercase character. In your case, that's also orderSagaConfiguration.

That may conflict with a configuration such as:

@Bean
public SagaConfiguration orderSagaConfiguration() {
    return SagaConfiguration.trackingSagaManager(OrderSaga.class);
}

Both beans are called "orderSagaConfiguration" and Spring will overwrite one with another. You can resolve this by using an explicit name on the @Configuration annotation on OrderSagaConfiguration.

Cheers,

Allard

Edison Xu

unread,
Apr 20, 2017, 5:02:21 AM4/20/17
to Axon Framework Users
Hi Allard,

Thanks for your reply. I followed your description to update my configure as below
@Bean
public SagaConfiguration<OrderSaga> orderSagaConfiguration(Serializer serializer){
    Function<org.axonframework.config.Configuration, SubscribableMessageSource<EventMessage<?>>> func = c-> queueMessageSource(serializer);
    SagaConfiguration<OrderSaga> sagaConfiguration = SagaConfiguration.subscribingSagaManager(OrderSaga.class, func);
    return sagaConfiguration;
}


but somehow, it reports "javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call" Exception. 
It's strange because in the codes 
private SagaConfiguration(Class<S> sagaType, Function<Configuration, SubscribableMessageSource<EventMessage<?>>> messageSourceBuilder) {
    String managerName = sagaType.getSimpleName() + "Manager";
    String processorName = sagaType.getSimpleName() + "Processor";
    String repositoryName = sagaType.getSimpleName() + "Repository";
    sagaStore = new Component<>(() -> config, "sagaStore", c -> c.getComponent(SagaStore.class, InMemorySagaStore::new));
    sagaRepository = new Component<>(() -> config, repositoryName,
                                     c -> new AnnotatedSagaRepository<>(sagaType, sagaStore.get(), c.resourceInjector(),
                                                                        c.parameterResolverFactory()));
    sagaManager = new Component<>(() -> config, managerName, c -> new AnnotatedSagaManager<>(sagaType, sagaRepository.get(),
                                                                                             c.parameterResolverFactory()));
    processor = new Component<>(() -> config, processorName,
                                c -> {
                                    SubscribingEventProcessor processor = new SubscribingEventProcessor(managerName, sagaManager.get(),
                                                                                                        messageSourceBuilder.apply(c));
                                    processor.registerInterceptor(new CorrelationDataInterceptor<>(c.correlationDataProviders()));
    return processor;
    });
}

a InMemorySagaStore is created, but actually Axon uses JpaSagaStore instead, and the entity manager is lost. (I use Jpa to store the domain events)

I tried to define a axonTransactionManager in my configure class, but still not working.
So I added an interceptor to make it work.
@Bean
public SagaConfiguration<OrderSaga> orderSagaConfiguration(Serializer serializer){
    Function<org.axonframework.config.Configuration, SubscribableMessageSource<EventMessage<?>>> func = c-> queueMessageSource(serializer);
    SagaConfiguration<OrderSaga> sagaConfiguration = SagaConfiguration.subscribingSagaManager(OrderSaga.class, func);
    Function<org.axonframework.config.Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> interceptorBuilder =
            c->transactionManagingInterceptor();
    sagaConfiguration.registerHandlerInterceptor(interceptorBuilder);
    return sagaConfiguration;
}

@Bean
public TransactionManagingInterceptor transactionManagingInterceptor(){
    return new TransactionManagingInterceptor(new SpringTransactionManager(transactionManager));
}

Although it's working now, I'm still a little bit worried about why Jpatransaction manager is lost, and how to correct it.

Thank you,

Regards,

Edison


在 2017年4月19日星期三 UTC+8下午9:21:03,Allard Buijze写道:

Allard Buijze

unread,
Apr 20, 2017, 5:36:40 AM4/20/17
to Axon Framework Users
Hi Edison,

the code in the SagaConfiguration class just specifies that an InMemorySagaStore should be used if no store was explicitly configured. The Sprint Autoconfiguration will configure a JpaSagaStore for you if it detects JPA on the classpath.

It seems that you are reading some messages from an external source. The subscribing event processor doesn't start any transaction by default, because it is invoked in the thread that published the event to the processor. It is assumed that a transaction will be running for that component already. Obviously, you can still configure a Transaction Manager, just the way you did it. 
Consider configuring a transaction manager in your external message source, instead.

A small recommendation from my side. The code is much more readable if you don't assign functions to variables. For example:
return SagaConfiguration.subscribingSagaManager(OrderSaga.class, c-> queueMessageSource(serializer))
                        .sagaConfiguration.registerHandlerInterceptor(c->transactionManagingInterceptor());
This reads a lot easier, in my opinion.

Cheers,

Allard
Message has been deleted
Message has been deleted
Message has been deleted

Edison Xu

unread,
Apr 20, 2017, 10:39:38 PM4/20/17
to Axon Framework Users
Hi Allard,

Thanks for the quick reply and clarification. 
As you mentioned, I added a @Transactional annotation on the onMessage method, it works as well.
@Bean
public SpringAMQPMessageSource queueMessageSource(Serializer serializer){
    return new SpringAMQPMessageSource(serializer){
        @RabbitListener(queues = "orderqueue")
        @Override
        @Transactional
        public void onMessage(Message message, Channel channel) throws Exception {
            LOGGER.debug("Message received: "+message.toString());
            super.onMessage(message, channel);
        }
    };
}

And for the function to variables, I copied from the declaration of the method itself for convenience. I'll definitely alter it to the propery lambda style.
Thank your for the recommendation. :)

Sorry for bordering you, but I still have one more question to confirm.
Let's say I have two instance of the same Saga codes, both receiving a event to trigger the subsequent scenario in the Saga(listening to a Fanout queue). 
One will continue the scenario correctly because it can find the saga instance with the associate key, and for the other, will it create a new saga to continue the work? 
Not sure about whether the result will be impact by the types of saga repository.

Thanks,

Edison

在 2017年4月20日星期四 UTC+8下午5:36:40,Allard Buijze写道:

Allard Buijze

unread,
Apr 24, 2017, 8:48:02 AM4/24/17
to Axon Framework Users
Hi Edison,

good to hear you got it working.

Not sure what you mean with "will it create a new saga to continue the work". If you have 2 saga instances, then they will both process the event, if they both have a matching association value.
If you have 1 existing Saga and the event handler method is (also) annotated with @StartSaga, it will not create a new instance, unless @StartSaga(forceNew =true) is specified.

Hope this helps.
Cheers,

Allard
Reply all
Reply to author
Forward
0 new messages