Event handler processing an event message from RabbitMQ twice

893 views
Skip to first unread message

Antonio García García

unread,
Apr 23, 2015, 5:09:34 AM4/23/15
to axonfr...@googlegroups.com

Hi,


Due to our project requirements we decided to change the Event Bus  from the Simple Event Bus to the Distributed Event Bus configured as indicated at the end of this email. This configuration contains the producer of the event and the consumer configuration as currently both are in the same application.


After set this configuration for some reason the application sends only one event to the event bus but the event handler is receiving twice the same event. In fact, as I can see in the logs and in the RabbitMq management UI the message is being sent once to the queue and received only once by the BlockingQueueConsumer, as can be seen in the following log traces:


2015-04-22 18:18:37.726 DEBUG 80024 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer : Received message: (Body:'[B@75dc8732(byte[537])'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=Axon.EventBus, receivedRoutingKey=com.chambers.group.domain.events, deliveryTag=1, messageCount=0])


Event 384eb99e-35ee-79e9-fff9-3929279dcf8c received

Event 384eb99e-35ee-79e9-fff9-3929279dcf8c received



So, I understand I have something wrong on my configuration. Can anybody please help me?


Thanks in advance for your help.


Antonio



Configuration:


package com.chambers.group.configuration;






import java.net.UnknownHostException;




import org.axonframework.commandhandling.CommandBus;

import org.axonframework.commandhandling.SimpleCommandBus;

import org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandler;

import org.axonframework.commandhandling.annotation.AnnotationCommandHandlerBeanPostProcessor;

import org.axonframework.commandhandling.gateway.CommandGateway;

import org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean;

import org.axonframework.eventhandling.ClusteringEventBus;

import org.axonframework.eventhandling.DefaultClusterSelector;

import org.axonframework.eventhandling.SimpleCluster;

import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;

import org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager;

import org.axonframework.eventhandling.amqp.spring.SpringAMQPConsumerConfiguration;

import org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal;

import org.axonframework.eventhandling.annotation.AnnotationEventListenerBeanPostProcessor;

import org.axonframework.eventsourcing.EventSourcingRepository;

import org.axonframework.eventstore.mongo.DefaultMongoTemplate;

import org.axonframework.eventstore.mongo.MongoEventStore;

import org.axonframework.eventstore.mongo.MongoTemplate;

import org.axonframework.serializer.xml.XStreamSerializer;

import org.springframework.amqp.core.AcknowledgeMode;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitAdmin;

import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.support.TaskUtils;




import com.chambers.group.domain.objects.GroupAggregate;

import com.mongodb.MongoClient;




@Configuration

public class AxonConfiguration {

   
private @Value("${mongo.host}") String mongo;

   
private @Value("${mongo.port}") int port;

   
private @Value("${mongo.password}") String password;

   
private @Value("${mongo.user}") String user;

   
private @Value("${mongo.db}") String db;

   
private @Value("${mongo.collection}") String collection;

   
private @Value("${mongo.snapshot}") String snapshot;




   
// Rabbit

   
private @Value("${rabbitmq.host}") String rabbitHost;

   
private @Value("${rabbitmq.port}") Integer rabbitPort;

   
private @Value("${rabbitmq.username}") String rabbitUsername;

   
private @Value("${rabbitmq.password}") String rabbitPassword;

   
private @Value("${rabbitmq.exchange.name}") String rabbitExchangeName;

   
private @Value("${rabbitmq.exchange.autodelete}") boolean rabbitExchangeAutodelete;

   
private @Value("${rabbitmq.exchange.durable}") boolean rabbitExchangeDurable;

   
private @Value("${rabbitmq.queue.name}") String rabbitQueueName;

   
private @Value("${rabbitmq.queue.durable}") Boolean rabbitQueueDurable;

   
private @Value("${rabbitmq.queue.exclusive}") Boolean rabbitQueueExclusive;

   
private @Value("${rabbitmq.queue.autodelete}") Boolean rabbitQueueAutoDelete;

   
private @Value("${rabbitmq.queue-listener.prefetch-count}") Integer rabbitQueueListenerPrefetchCount;

   
private @Value("${rabbitmq.queue-listener.recovery-interval}") Long rabbitQueueListenerRecoveryInterval;

   
private @Value("${rabbitmq.queue-listener.cluster-transaction-size}") Integer rabbitQueueClusterTransactionSize;




   
/**

     * RabbitMQ Subsystem Configuration

     */





   
// Serializer

   
@Bean

   
public XStreamSerializer xstreamSerializer() {

       
return new XStreamSerializer();

   
}




   
// Connection Factory

   
@Bean

   
public ConnectionFactory connectionFactory() {

       
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);

        connectionFactory
.setUsername(rabbitUsername);

        connectionFactory
.setPassword(rabbitPassword);

       
return connectionFactory;

   
}




   
// Event bus exchange

   
@Bean

   
public FanoutExchange eventBusExchange() {

       
return new FanoutExchange(rabbitExchangeName, rabbitExchangeDurable, rabbitExchangeAutodelete);

   
}




   
// Event bus queue

   
@Bean

   
public Queue eventBusQueue() {

       
return new Queue(rabbitQueueName, rabbitQueueDurable, rabbitQueueExclusive, rabbitQueueAutoDelete);

   
}




   
// Binding

   
@Bean

   
public Binding binding() {

       
return BindingBuilder.bind(eventBusQueue()).to(eventBusExchange());

   
}




   
// Rabit Admin

   
@Bean

   
public RabbitAdmin rabbitAdmin() {

       
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());

        rabbitAdmin
.setAutoStartup(true);

       
return rabbitAdmin;

   
}




   
/**

     * AXON

     */





   
// Command Bus

   
@Bean

   
public CommandBus commandBus() {

       
return new SimpleCommandBus();

   
}




   
// Event bus

   
@Bean

   
public ClusteringEventBus eventBus() {

       
ClusteringEventBus clusteringEventBus = new ClusteringEventBus(new DefaultClusterSelector(simpleCluster()), terminal());

       
return clusteringEventBus;

   
}




   
// Terminal

   
@Bean

   
public SpringAMQPTerminal terminal() {

       
SpringAMQPTerminal terminal = new SpringAMQPTerminal();

        terminal
.setConnectionFactory(connectionFactory());

        terminal
.setSerializer(xstreamSerializer());

        terminal
.setExchangeName(rabbitExchangeName);

        terminal
.setListenerContainerLifecycleManager(listenerContainerLifecycleManager());

        terminal
.setDurable(true);

        terminal
.setTransactional(false);

       
return terminal;

   
}




   
// Configuration

   
@Bean

   
SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration() {

       
SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration = new SpringAMQPConsumerConfiguration();

        springAMQPConsumerConfiguration
.setDefaults(null);

        springAMQPConsumerConfiguration
.setQueueName(rabbitQueueName);

        springAMQPConsumerConfiguration
.setErrorHandler(TaskUtils.getDefaultErrorHandler(false));

        springAMQPConsumerConfiguration
.setAcknowledgeMode(AcknowledgeMode.AUTO);

        springAMQPConsumerConfiguration
.setConcurrentConsumers(1);

        springAMQPConsumerConfiguration
.setRecoveryInterval(rabbitQueueListenerRecoveryInterval);

        springAMQPConsumerConfiguration
.setExclusive(false);

        springAMQPConsumerConfiguration
.setPrefetchCount(rabbitQueueListenerPrefetchCount);

        springAMQPConsumerConfiguration
.setTransactionManager(new RabbitTransactionManager(connectionFactory()));

        springAMQPConsumerConfiguration
.setTxSize(rabbitQueueClusterTransactionSize);

       
return springAMQPConsumerConfiguration;

   
}




   
// Cluster definition

   
@Bean

   
SimpleCluster simpleCluster() {

       
SimpleCluster simpleCluster = new SimpleCluster(rabbitQueueName);

       
return simpleCluster;

   
}




   
// Message converter

   
@Bean

   
DefaultAMQPMessageConverter defaultAMQPMessageConverter() {

       
return new DefaultAMQPMessageConverter(xstreamSerializer());

   
}




   
// Message listener configuration

   
@Bean

   
ListenerContainerLifecycleManager listenerContainerLifecycleManager() {

       
SimpleCluster simpleCluster = simpleCluster();

       
DefaultAMQPMessageConverter defaultAMQPMessageConverter = defaultAMQPMessageConverter();

       
ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();

        listenerContainerLifecycleManager
.setConnectionFactory(connectionFactory());

        listenerContainerLifecycleManager
.registerCluster(simpleCluster, springAMQPConsumerConfiguration(), defaultAMQPMessageConverter);

       
return listenerContainerLifecycleManager;

   
}




   
// Event listener

   
@Bean

   
public AnnotationEventListenerBeanPostProcessor annotationEventListenerBeanPostProcessor() {

       
AnnotationEventListenerBeanPostProcessor processor = new AnnotationEventListenerBeanPostProcessor();

        processor
.setEventBus(eventBus());

       
return processor;

   
}




   
// Command Handler

   
@Bean

   
public AnnotationCommandHandlerBeanPostProcessor annotationCommandHandlerBeanPostProcessor() {

       
AnnotationCommandHandlerBeanPostProcessor processor = new AnnotationCommandHandlerBeanPostProcessor();

        processor
.setCommandBus(commandBus());

       
return processor;

   
}




   
// Command Gateway

   
@Bean

   
public CommandGatewayFactoryBean<CommandGateway> commandGatewayFactoryBean() {

       
CommandGatewayFactoryBean<CommandGateway> factory = new CommandGatewayFactoryBean<CommandGateway>();

        factory
.setCommandBus(commandBus());

       
return factory;

   
}




   
// Event Repository

   
@Bean

   
public EventSourcingRepository<GroupAggregate> eventSourcingRepository() {

       
MongoClient mongoClient = null;




       
MongoTemplate mongoTemplate = null;

       
try {

            mongoClient
= new MongoClient(this.mongo, this.port);

            mongoTemplate
= new DefaultMongoTemplate(mongoClient, this.db, this.collection, this.snapshot, (this.user.isEmpty()) ? null : this.user, (this.password.isEmpty()) ? null : this.password.toCharArray());

       
} catch (UnknownHostException e) {

            e
.printStackTrace();

       
}




       
MongoEventStore eventStore = new MongoEventStore(mongoTemplate);

       
EventSourcingRepository<GroupAggregate> repository = new EventSourcingRepository<>(GroupAggregate.class, eventStore);

        repository
.setEventBus(eventBus());

       
return repository;

   
}




   
@SuppressWarnings("unchecked")

   
@Bean

   
public AggregateAnnotationCommandHandler<GroupAggregate> groupCommandHandler() {

       
AggregateAnnotationCommandHandler<GroupAggregate> commandHandler = AggregateAnnotationCommandHandler.subscribe(GroupAggregate.class, eventSourcingRepository(), commandBus());

       
return commandHandler;

   
}

}




Allard Buijze

unread,
Apr 23, 2015, 5:33:04 AM4/23/15
to Axon Framework Users
Hi Antonio,

in your configuration, in the ListenerContainerLifecycleManager config, you're calling "registerCluster". Since the terminal will register a cluster, this means your cluster is registered with a ListenerContainer twice. Axon will reuse the same container, causing a message to be downloaded twice. But since the Cluster is subscribed twice, it will receive messages twice.

Removing the call to registerCluster will resolve the issue.
Cheers,

Allard

--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Antonio García García

unread,
Apr 23, 2015, 7:47:55 AM4/23/15
to axonfr...@googlegroups.com
Hi Allard,

Yes, you are right, after remove this line the EventHandler is capturing the event only once.

Thanks a lot for your help!


...

Peter Davis

unread,
Jun 30, 2015, 12:44:54 PM6/30/15
to axonfr...@googlegroups.com
Your code must be handling duplicate messages anyway (for rare cases) since RabbitMQ cannot guarantee "exactly once" delivery.  I'm interested, what kind of "idempotent consumer" strategy are you using?


-Peter
...
Reply all
Reply to author
Forward
0 new messages