axon.amqp.exchange=Axon.EventBus
axon.eventhandling.processors.order.source=queueMessageSource
@Configuration
public class AMQPConfiguration {
private static final Logger LOGGER = getLogger(AMQPConfiguration.class);
@Value("${axon.amqp.exchange}")
private String exchangeName;
@Bean
public Queue queue(){
return new Queue("orderqueue", true);
}
@Bean
public Exchange exchange(){
return ExchangeBuilder.fanoutExchange(exchangeName).durable(true).build();
}
@Bean
public Binding queueBinding() {
return BindingBuilder.bind(queue()).to(exchange()).with("").noargs();
}
@Bean
public SpringAMQPMessageSource queueMessageSource(Serializer serializer){
return new SpringAMQPMessageSource(serializer){
@RabbitListener(queues = "orderqueue")
@Override
public void onMessage(Message message, Channel channel) throws Exception {
LOGGER.debug("Message received: "+message.toString());
super.onMessage(message, channel);
}
};
}
}
--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Hi Edison,
nested exception is org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'orderSagaConfiguration' is expected to be of type 'org.axonframework.config.ModuleConfiguration' but was actually of type 'com.edi.learn.cloud.command.config.OrderSagaConfiguration$$EnhancerBySpringCGLIB$7eddc99e'
@Bean
public SagaConfiguration<OrderSaga> orderSagaConfiguration(Serializer serializer){
Function<org.axonframework.config.Configuration, SubscribableMessageSource<EventMessage<?>>> func = c-> queueMessageSource(serializer);
SagaConfiguration<OrderSaga> sagaConfiguration = SagaConfiguration.subscribingSagaManager(OrderSaga.class, func);
return sagaConfiguration;
}
private SagaConfiguration(Class<S> sagaType, Function<Configuration, SubscribableMessageSource<EventMessage<?>>> messageSourceBuilder) {
String managerName = sagaType.getSimpleName() + "Manager";
String processorName = sagaType.getSimpleName() + "Processor";
String repositoryName = sagaType.getSimpleName() + "Repository";
sagaStore = new Component<>(() -> config, "sagaStore", c -> c.getComponent(SagaStore.class, InMemorySagaStore::new));
sagaRepository = new Component<>(() -> config, repositoryName,
c -> new AnnotatedSagaRepository<>(sagaType, sagaStore.get(), c.resourceInjector(),
c.parameterResolverFactory()));
sagaManager = new Component<>(() -> config, managerName, c -> new AnnotatedSagaManager<>(sagaType, sagaRepository.get(),
c.parameterResolverFactory()));
processor = new Component<>(() -> config, processorName,
c -> {
SubscribingEventProcessor processor = new SubscribingEventProcessor(managerName, sagaManager.get(),
messageSourceBuilder.apply(c));
processor.registerInterceptor(new CorrelationDataInterceptor<>(c.correlationDataProviders()));
return processor;
});
}
@Bean
public SagaConfiguration<OrderSaga> orderSagaConfiguration(Serializer serializer){
Function<org.axonframework.config.Configuration, SubscribableMessageSource<EventMessage<?>>> func = c-> queueMessageSource(serializer);
SagaConfiguration<OrderSaga> sagaConfiguration = SagaConfiguration.subscribingSagaManager(OrderSaga.class, func);
Function<org.axonframework.config.Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> interceptorBuilder =
c->transactionManagingInterceptor();
sagaConfiguration.registerHandlerInterceptor(interceptorBuilder);
return sagaConfiguration;
}
@Bean
public TransactionManagingInterceptor transactionManagingInterceptor(){
return new TransactionManagingInterceptor(new SpringTransactionManager(transactionManager));
}
return SagaConfiguration.subscribingSagaManager(OrderSaga.class, c-> queueMessageSource(serializer)) .sagaConfiguration.registerHandlerInterceptor(c->transactionManagingInterceptor());This reads a lot easier, in my opinion.
@Bean
public SpringAMQPMessageSource queueMessageSource(Serializer serializer){
return new SpringAMQPMessageSource(serializer){
@RabbitListener(queues = "orderqueue")
@Override
@Transactional
public void onMessage(Message message, Channel channel) throws Exception {
LOGGER.debug("Message received: "+message.toString());
super.onMessage(message, channel);
}
};
}