@Configuration
@EntityScan({
"com.mycomp.jpas"
})
public class AxonEventStorage {
@PersistenceUnit
private EntityManagerFactory entityManagerFactory;
@Bean
public PlatformTransactionManager transactionManager() {
return new JpaTransactionManager(entityManagerFactory);
}
@Bean
public SpringTransactionManager springTransactionManager(PlatformTransactionManager transactionManager) {
return new SpringTransactionManager(transactionManager);
}
@Bean
public TransactionManagerFactoryBean transactionManagerFactoryBean(PlatformTransactionManager transactionManager) throws PropertyVetoException {
TransactionManagerFactoryBean factoryBean = new TransactionManagerFactoryBean();
factoryBean.setTransactionManager(transactionManager);
return factoryBean;
}
@Bean
public EntityManagerProvider entityManagerProvider() {
return new ContainerManagedEntityManagerProvider();
}
@Bean
public EventStore eventStore(EntityManagerProvider entityManagerProvider, DataSource dataSource, SpringTransactionManager springTransactionManager, Serializer serializer) throws SQLException {
EventStorageEngine eventStorageEngine = new PostgresEventStorage(serializer, null, dataSource, springTransactionManager, entityManagerProvider);
return new EmbeddedEventStore(eventStorageEngine);
}
@Bean
public SagaStore<Object> postgresJPASagaStore(Serializer serializer, EntityManagerProvider entityManagerProvider) {
return new PostgresJsonSagaStore(serializer, entityManagerProvider);
}
}
@Configuration
public class AxonGeneral {
/**
* @return We're using the JSON serializer
*/
@Bean
public Serializer serializer() {
return new JacksonSerializer();
}
}
@Configuration
/**
* RabbitMQ Spring Configuration
*/
public class RabbitMQConfig {
@Value("${rabbitmq.host}")
private String rabbitHost;
@Value("${rabbitmq.port}")
private Integer rabbitPort;
@Value("${rabbitmq.username}")
private String rabbitUsername;
@Value("${rabbitmq.password}")
private String rabbitPassword;
@Value("${rabbitmq.exchange.name}")
private String rabbitExchangeName;
@Value("${rabbitmq.exchange.autodelete}")
private boolean rabbitExchangeAutodelete;
@Value("${rabbitmq.exchange.durable}")
private boolean rabbitExchangeDurable;
@Value("${rabbitmq.queue.name}")
private String rabbitQueueName;
@Value("${rabbitmq.queue.durable}")
private Boolean rabbitQueueDurable;
@Value("${rabbitmq.queue.exclusive}")
private Boolean rabbitQueueExclusive;
@Value("${rabbitmq.queue.autodelete}")
private Boolean rabbitQueueAutoDelete;
@Value("${rabbitmq.queue-listener.prefetch-count}")
private Integer rabbitQueueListenerPrefetchCount;
@Value("${rabbitmq.queue-listener.recovery-interval}")
private Long rabbitQueueListenerRecoveryInterval;
@Value("${rabbitmq.queue-listener.cluster-transaction-size}")
private Integer rabbitQueueClusterTransactionSize;
// Connection Factory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
connectionFactory.setUsername(rabbitUsername);
connectionFactory.setPassword(rabbitPassword);
return connectionFactory;
}
// Fanout Exchange
@Bean
public FanoutExchange eventBusExchange() {
return new FanoutExchange(rabbitExchangeName, rabbitExchangeDurable, rabbitExchangeAutodelete);
}
// Rabbit queue
@Bean
public Queue rabbitQueue() {
return new Queue(rabbitQueueName, rabbitQueueDurable, rabbitQueueExclusive, rabbitQueueAutoDelete);
}
// AMPQ Binding
@Bean
public Binding ampqBinding(Queue eventBusQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(eventBusQueue).to(fanoutExchange);
}
// Rabit Admin
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
// Spring AMPQ Terminal
@Bean
public SpringAMQPTerminal terminal(EventBus eventBus, Serializer serializer, ConnectionFactory connectionFactory, ListenerContainerLifecycleManager listenerContainerLifecycleManager) {
SpringAMQPTerminal terminal = new SpringAMQPTerminal(eventBus);
terminal.setConnectionFactory(connectionFactory);
terminal.setSerializer(serializer);
terminal.setExchangeName(rabbitExchangeName);
terminal.setListenerContainerLifecycleManager(listenerContainerLifecycleManager);
terminal.setDurable(true);
terminal.setTransactional(false);
return terminal;
}
// Axon AMQPConsumerConfiguration
@Bean
public AMQPConsumerConfiguration springAMQPConsumerConfiguration() {
SpringAMQPConsumerConfiguration springAMQPConsumerConfiguration = new SpringAMQPConsumerConfiguration();
springAMQPConsumerConfiguration.setDefaults(null);
springAMQPConsumerConfiguration.setQueueName(rabbitQueueName);
springAMQPConsumerConfiguration.setErrorHandler(TaskUtils.getDefaultErrorHandler(false));
springAMQPConsumerConfiguration.setAcknowledgeMode(AcknowledgeMode.AUTO);
springAMQPConsumerConfiguration.setConcurrentConsumers(1);
springAMQPConsumerConfiguration.setRecoveryInterval(rabbitQueueListenerRecoveryInterval);
springAMQPConsumerConfiguration.setExclusive(false);
springAMQPConsumerConfiguration.setPrefetchCount(rabbitQueueListenerPrefetchCount);
springAMQPConsumerConfiguration.setTransactionManager(new RabbitTransactionManager(connectionFactory()));
springAMQPConsumerConfiguration.setTxSize(rabbitQueueClusterTransactionSize);
return springAMQPConsumerConfiguration;
}
// Axon AMPQ Message converter
@Bean
public DefaultAMQPMessageConverter defaultAMQPMessageConverter(Serializer serializer) {
return new DefaultAMQPMessageConverter(serializer);
}
// Axon Rabbit(container) Life cycle manager
@Bean
public ListenerContainerLifecycleManager listenerContainerLifecycleManager(ConnectionFactory connectionFactory, EventProcessor eventProcessor, AMQPConsumerConfiguration aMQPConsumerConfiguration, DefaultAMQPMessageConverter defaultAMQPMessageConverter) {
ListenerContainerLifecycleManager listenerContainerLifecycleManager = new ListenerContainerLifecycleManager();
listenerContainerLifecycleManager.setConnectionFactory(connectionFactory);
// listenerContainerLifecycleManager.registerEventProcessor(eventProcessor, aMQPConsumerConfiguration, defaultAMQPMessageConverter)
return listenerContainerLifecycleManager;
}
}
@Configuration
public class AxonCommandHandling {
@Bean
public AnnotationCommandHandlerAdapter annotationCredApplicationCommandHandler(CommandBus commandBus, MyHandler myHandler) {
AnnotationCommandHandlerAdapter annotationCommandHandlerAdapter = new AnnotationCommandHandlerAdapter(myHandler);
annotationCommandHandlerAdapter.subscribe(commandBus);
return annotationCommandHandlerAdapter;
}
@Bean
public CommandBus simpleCommandBus() {
SimpleCommandBus simpleCommandBus = new SimpleCommandBus();
simpleCommandBus.setDispatchInterceptors(Arrays.asList(new BeanValidationInterceptor<>()));
return simpleCommandBus;
}
}
@Configuration
public class AxonEventHandling {
@Bean
public EventProcessor eventProcessor(EventBus eventBus, MyListener myListener) {
EventProcessor eventProcessor = new SubscribingEventProcessor("eventProcessor",
new SimpleEventHandlerInvoker(
myListener),
eventBus);
eventProcessor.start();
return eventProcessor;
}
@Bean
public EventProcessorMessageListener eventProcessorMessageListener(EventProcessor eventProcessor,DefaultAMQPMessageConverter ampqMessageConverter) {
EventProcessorMessageListener messageListener = new EventProcessorMessageListener(ampqMessageConverter);
return messageListener;
}
}
--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframework+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "Axon Framework Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/axonframework/hvZ6-U9QcUI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to axonframework+unsubscribe@googlegroups.com.
I found my fault. I configurate my EventProcessor on EventStore not on SpringAMQPMessageSource. But now I have another problem, as a have 2 SpringAMQPMessageSource (one in aggregate microservice, and one in another microservices) amqp messages dispatched in turn only in one of them (one dispatched only in SpringAMQPMessageSource on aggregate side, another in view microservice).Can you please suggest, what can be wrong?