--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
package com.chambers.group.configuration;
import java.net.UnknownHostException;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.annotation.AggregateAnnotationCommandHandler;
import org.axonframework.commandhandling.annotation.AnnotationCommandHandlerBeanPostProcessor;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean;
import org.axonframework.eventhandling.ClusteringEventBus;
import org.axonframework.eventhandling.DefaultClusterSelector;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventBusTerminal;
import org.axonframework.eventhandling.SimpleCluster;
import org.axonframework.eventhandling.amqp.AMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.DefaultAMQPMessageConverter;
import org.axonframework.eventhandling.amqp.spring.ListenerContainerLifecycleManager;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPConsumerConfiguration;
import org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal;
import org.axonframework.eventhandling.annotation.AnnotationEventListenerBeanPostProcessor;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventstore.mongo.DefaultMongoTemplate;
import org.axonframework.eventstore.mongo.MongoEventStore;
import org.axonframework.eventstore.mongo.MongoTemplate;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.support.TaskUtils;
import com.chambers.group.domain.objects.GroupAggregate;
import com.mongodb.MongoClient;
@Configuration
public class AxonConfiguration {
private @Value("${mongo.host}") String mongo;
private @Value("${mongo.port}") int port;
private @Value("${mongo.password}") String password;
private @Value("${mongo.user}") String user;
private @Value("${mongo.db}") String db;
private @Value("${mongo.collection}") String collection;
private @Value("${mongo.snapshot}") String snapshot;
// Rabbit
private @Value("${rabbitmq.host}") String rabbitHost;
private @Value("${rabbitmq.port}") Integer rabbitPort;
private @Value("${rabbitmq.username}") String rabbitUsername;
private @Value("${rabbitmq.password}") String rabbitPassword;
private @Value("${rabbitmq.exchange.name}") String rabbitExchangeName;
private @Value("${rabbitmq.exchange.autodelete}") boolean rabbitExchangeAutodelete;
private @Value("${rabbitmq.exchange.durable}") boolean rabbitExchangeDurable;
private @Value("${rabbitmq.queue.name}") String rabbitQueueName;
private @Value("${rabbitmq.queue.durable}") Boolean rabbitQueueDurable;
private @Value("${rabbitmq.queue.exclusive}") Boolean rabbitQueueExclusive;
private @Value("${rabbitmq.queue.autodelete}") Boolean rabbitQueueAutoDelete;
private @Value("${rabbitmq.queue-listener.prefetch-count}") Integer rabbitQueueListenerPrefetchCount;
private @Value("${rabbitmq.queue-listener.recovery-interval}") Long rabbitQueueListenerRecoveryInterval;
private @Value("${rabbitmq.queue-listener.cluster-transaction-size}") Integer rabbitQueueClusterTransactionSize;
/**
* RabbitMQ Subsystem Configuration
*/
// Serializer
@Bean
public XStreamSerializer xstreamSerializer() {
return new XStreamSerializer();
}
// Connection Factory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUsername);
connectionFactory.setPassword(rabbitPassword);
return connectionFactory;
}
// Event bus exchange
@Bean
public FanoutExchange eventBusExchange() {
return new FanoutExchange(rabbitExchangeName, rabbitExchangeDurable, rabbitExchangeAutodelete);
}
// Event bus queue
@Bean
public Queue eventBusQueue() {
return new Queue(rabbitQueueName, rabbitQueueDurable, rabbitQueueExclusive, rabbitQueueAutoDelete);
}
// Binding
@Bean
public Binding binding() {
return BindingBuilder.bind(eventBusQueue()).to(eventBusExchange());
}
// Rabit Admin
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* AXON
*/
// Command Bus
@Bean
public CommandBus commandBus() {
return new SimpleCommandBus();
}
// Event bus
@Bean
public EventBus eventBus() {
ClusteringEventBus clusteringEventBus = new ClusteringEventBus(new DefaultClusterSelector(simpleCluster()), terminal());
return clusteringEventBus;
}
// Terminal
@Bean
public EventBusTerminal terminal() {
SpringAMQPTerminal terminal = new SpringAMQPTerminal();
terminal.setConnectionFactory(connectionFactory());
terminal.setSerializer(xstreamSerializer());
terminal.setExchangeName(rabbitExchangeName);
terminal.setListenerContainerLifecycleManager(listenerContainerLifecycleManager());
terminal.setDurable(true);
terminal.setTransactional(false);
return terminal;
}
// Configuration
@Bean
AMQPConsumerConfiguration springAMQPConsumerConfiguration() {
SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration = new SpringAMQPConsumerConfiguration();
springAMQPConsumerConfiguration.setDefaults(null);
springAMQPConsumerConfiguration.setQueueName(rabbitQueueName);
springAMQPConsumerConfiguration.setErrorHandler(TaskUtils.getDefaultErrorHandler(false));
springAMQPConsumerConfiguration.setAcknowledgeMode(AcknowledgeMode.AUTO);
springAMQPConsumerConfiguration.setConcurrentConsumers(1);
springAMQPConsumerConfiguration.setRecoveryInterval(rabbitQueueListenerRecoveryInterval);
springAMQPConsumerConfiguration.setExclusive(false);
springAMQPConsumerConfiguration.setPrefetchCount(rabbitQueueListenerPrefetchCount);
springAMQPConsumerConfiguration.setTransactionManager(new RabbitTransactionManager(connectionFactory()));
springAMQPConsumerConfiguration.setTxSize(rabbitQueueClusterTransactionSize);
return springAMQPConsumerConfiguration;
}
// Cluster definition
@Bean
SimpleCluster simpleCluster() {
SimpleCluster simpleCluster = new SimpleCluster(rabbitQueueName);
return simpleCluster;
}
// Message converter
@Bean
DefaultAMQPMessageConverter defaultAMQPMessageConverter() {
return new DefaultAMQPMessageConverter(xstreamSerializer());
}
// Message listener configuration
@Bean
ListenerContainerLifecycleManager listenerContainerLifecycleManager() {
ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();
listenerContainerLifecycleManager.setConnectionFactory(connectionFactory());
return listenerContainerLifecycleManager;
}
// Event listener
@Bean
public AnnotationEventListenerBeanPostProcessor annotationEventListenerBeanPostProcessor() {
AnnotationEventListenerBeanPostProcessor processor = new AnnotationEventListenerBeanPostProcessor();
processor.setEventBus(eventBus());
return processor;
}
// Command Handler
@Bean
public AnnotationCommandHandlerBeanPostProcessor annotationCommandHandlerBeanPostProcessor() {
AnnotationCommandHandlerBeanPostProcessor processor = new AnnotationCommandHandlerBeanPostProcessor();
processor.setCommandBus(commandBus());
return processor;
}
// Command Gateway
@Bean
public CommandGatewayFactoryBean<CommandGateway> commandGatewayFactoryBean() {
CommandGatewayFactoryBean<CommandGateway> factory = new CommandGatewayFactoryBean<CommandGateway>();
factory.setCommandBus(commandBus());
return factory;
}
// Event Repository
@Bean
public EventSourcingRepository<GroupAggregate> eventSourcingRepository() {
MongoClient mongoClient = null;
MongoTemplate mongoTemplate = null;
try {
mongoClient = new MongoClient(this.mongo, this.port);
mongoTemplate = new DefaultMongoTemplate(mongoClient, this.db, this.collection, this.snapshot, (this.user.isEmpty()) ? null : this.user, (this.password.isEmpty()) ? null : this.password.toCharArray());
} catch (UnknownHostException e) {
e.printStackTrace();
}
MongoEventStore eventStore = new MongoEventStore(mongoTemplate);
EventSourcingRepository<GroupAggregate> repository = new EventSourcingRepository<>(GroupAggregate.class, eventStore);
repository.setEventBus(eventBus());
return repository;
}
@SuppressWarnings("unchecked")
@Bean
public AggregateAnnotationCommandHandler<GroupAggregate> groupCommandHandler() {
AggregateAnnotationCommandHandler<GroupAggregate> commandHandler = AggregateAnnotationCommandHandler.subscribe(GroupAggregate.class, eventSourcingRepository(), commandBus());
return commandHandler;
}
}
@Bean
public ListenerContainerLifecycleManager listenerContainerLifecycleManager() {
ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();
listenerContainerLifecycleManager.setConnectionFactory(connectionFactory());
listenerContainerLifecycleManager.registerCluster(simpleCluster(), springAMQPConsumerConfiguration(), defaultAMQPMessageConverter() );
return listenerContainerLifecycleManager;
}
@Saga@ProcessingGroup("admin")public class MySaga {
private static final Logger LOGGER = Logger.getLogger(MySaga.class); @Autowired private transient CommandBus commandBus;
@EventHandler public void handle(SomeEventFromOtherModule event) { LOGGER.info(">>> SomeEventFromOtherModule received"); }
@StartSaga @SagaEventHandler(associationProperty = "eventId") public void on(SomeEventFromThisModile event) { otherEventId = event.otherId();
SagaLifecycle.associateWith("otherEventId", event.otherId());
commandBus.dispatch(GenericCommandMessage.asCommandMessage(commandToTheOtherModule)); }
@SagaEventHandler(associationProperty = "otherEventId") public void on(SomeEventFromOtherModule event) { LOGGER.info(">>> SomeEventFromOtherModule received by SagaEventHandler"); }}