Axon 3.0 Event Handlers not working ( Spring Boot, with starter)

552 views
Skip to first unread message

Fernando París

unread,
Jan 5, 2017, 10:47:58 AM1/5/17
to Axon Framework Users
I've been able to make eventsourcing basics work with mongodb, but i am not able to make eventhandlers work. Ive tried several ways but i am not able to...

I am trying eventhandler at aggregate and external. None of them works

CommandHandler works

The code is the following:





Appconfig with Spring Boot

package es.osoco.ulyseo.recommender

import com.mongodb.MongoClient
import com.mongodb.MongoCredential
import com.mongodb.ServerAddress
import es.osoco.ulyseo.recommender.application.recommendation.CalculateRecommendationUseCase
import es.osoco.ulyseo.recommender.domain.model.recommendation.Recommendation
import es.osoco.ulyseo.recommender.port.adapter.pricing.PricingPort
import groovy.util.logging.Slf4j
import org.axonframework.commandhandling.AggregateAnnotationCommandHandler
import org.axonframework.config.DefaultConfigurer
import org.axonframework.config.EventHandlingConfiguration
import org.axonframework.eventhandling.EventBus
import org.axonframework.eventhandling.EventProcessor
import org.axonframework.eventhandling.SimpleEventBus
import org.axonframework.eventhandling.SimpleEventHandlerInvoker
import org.axonframework.eventhandling.SubscribingEventProcessor
import org.axonframework.eventsourcing.EventSourcingRepository
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore
import org.axonframework.eventsourcing.eventstore.EventStore
import org.axonframework.mongo.eventsourcing.eventstore.DefaultMongoTemplate
import org.axonframework.mongo.eventsourcing.eventstore.MongoEventStorageEngine
import org.axonframework.mongo.eventsourcing.eventstore.MongoTemplate
import org.dozer.DozerBeanMapper
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.amqp.RabbitProperties
import org.springframework.context.annotation.Bean
import org.springframework.scheduling.annotation.EnableAsync

@Slf4j
@SpringBootApplication
@EnableAsync
public class RecommenderApp {

@Autowired
private RabbitProperties props

@Autowired
private ConnectionFactory cachingConnectionFactory

@Autowired
private PricingPort pricingPort

@Value('${app.queue.pricing}')
private String pricingQueueName;

@Value('${app.queue.aggregates}')
private String aggregatesQueueName;

@Value('${app.pricingGenerated.routingKey}')
private String pricingRoutingKey;




@Bean(name = "axonMongoTemplate")
MongoTemplate axonMongoTemplate() {

MongoClient mongoClient = new MongoClient(new ServerAddress("mongodb",29017), [MongoCredential.createCredential("recommender-int","recommender-int","recommender-int".getChars())])



MongoTemplate template = new DefaultMongoTemplate(mongoClient,
"recommender-int", "recommendation", "recommendation-snapshot");
return template;
}

@Bean
public EmbeddedEventStore embeddedEventStore(){

EventStore eventStore = new EmbeddedEventStore(new MongoEventStorageEngine(axonMongoTemplate()))

eventStore
}


@Bean
EventBus eventBus(EventStore eventStore){
eventStore
//new SimpleEventBus()
}
@Bean
public EventSourcingRepository<Recommendation> taskRepository() {


EventSourcingRepository repository = new EventSourcingRepository(Recommendation.class, embeddedEventStore())

return repository
}

@Autowired
CalculateRecommendationUseCase calculateRecommendationUseCase
@Bean
AggregateAnnotationCommandHandler<Recommendation> commandHandler(EventSourcingRepository eventSourcingRepository){


EventHandlingConfiguration ehConfiguration = new EventHandlingConfiguration()
.registerEventHandler {conf -> calculateRecommendationUseCase}
DefaultConfigurer.defaultConfiguration().registerModule(ehConfiguration)

new AggregateAnnotationCommandHandler<Recommendation>(Recommendation.class,eventSourcingRepository)

}

@Bean
SubscribingEventProcessor subscribingEventProcessor(EventStore eventStore) {
new SubscribingEventProcessor("events", new SimpleEventHandlerInvoker(calculateRecommendationUseCase), eventStore)
}






@Bean
Queue pricingQueue() {
return new Queue(pricingQueueName);
}

@Bean(name = "org.dozer.Mapper")
public DozerBeanMapper dozerBeanSearchCommandToRecommender() {
List<String> mappingFiles = Arrays.asList(
"dozerMappingSearchToRecommender.xml"
);

DozerBeanMapper dozerBeanSearchCommandToRecommender = new DozerBeanMapper()
dozerBeanSearchCommandToRecommender.setMappingFiles(mappingFiles)
return dozerBeanSearchCommandToRecommender

}

@Bean(name = "org.dozer.Mapper")
public DozerBeanMapper dozerApplicationMapper() {
List<String> mappingFiles = ["dozer-application-mappings.xml"]

DozerBeanMapper dozerBean = new DozerBeanMapper()
dozerBean.setMappingFiles(mappingFiles)
return dozerBean
}

@Bean(name = "org.dozer.Mapper")
public DozerBeanMapper dozerBeanMapper() {
List<String> mappingFiles = ["dozer-port-mappings.xml", "dozer-application-new-mappings.xml"]
DozerBeanMapper dozerBean = new DozerBeanMapper()
dozerBean.setMappingFiles(mappingFiles)
dozerBean
}

@Bean
Queue aggregatesQueue() {
return new Queue(aggregatesQueueName, false);
}

// Setting the annotation listeners to use the jackson2JsonMessageConverter
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory()
factory.setConnectionFactory(cachingConnectionFactory)
factory.setMessageConverter(jackson2JsonMessageConverter())
//Set properties from file
factory.setMaxConcurrentConsumers(props.getListener().getMaxConcurrency())
factory.setConcurrentConsumers(props.getListener().getConcurrency())
factory.setAcknowledgeMode(props.getListener().getAcknowledgeMode())
factory.setDefaultRequeueRejected(props.getListener().getDefaultRequeueRejected())

return factory
}

// Standardize on a single objectMapper for all message queue items
@Bean
public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
new Jackson2JsonMessageConverter()
}

@Autowired
public static void main(String[] args) {
SpringApplication.run(RecommenderApp.class, args)

}

}

Aggregate
package es.osoco.ulyseo.recommender.domain.model.recommendation

import es.osoco.ulyseo.recommender.application.recommendation.CalculateRecoCommand
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import groovy.transform.TupleConstructor
import groovy.util.logging.Slf4j
import org.axonframework.commandhandling.CommandHandler
import org.axonframework.commandhandling.model.AggregateIdentifier
import org.axonframework.commandhandling.model.AggregateLifecycle
import org.axonframework.commandhandling.model.AggregateMember
import org.axonframework.commandhandling.model.AggregateRoot
import org.axonframework.eventhandling.EventHandler
import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.mapping.Document

@Document(collection = "recommendationPort")
@ToString
@EqualsAndHashCode
@TupleConstructor
@Slf4j
class Recommendation {

// TODO fake fields, review them after contract is agreed
@AggregateIdentifier
@Id
String footprint
@AggregateMember
List<TravelPackage> travelPackages = new ArrayList<>()
@AggregateMember
TravelCriteria travelCriteria
Integer packagesCountForRecommendation

Recommendation() {
}

@CommandHandler
public Recommendation(CalculateRecoCommand calculateRecoCommand) {

this.footprint = calculateRecoCommand.footPrint

this.travelCriteria = calculateRecoCommand.travelCriteria

calculateRecoCommand.receivedTravelPackages.addAll(this.travelPackages)
calculateRecoCommand.receivedTravelPackages.sort {-it.scoring}

setTravelPackages(calculateRecoCommand.receivedTravelPackages.unique().take(2))

log.info "Keeping {} packages in recommendation, packagesCountForRecommendation: {}", travelPackages.size(), packagesCountForRecommendation


AggregateLifecycle.apply(new RecommendationCalculatedDomainEvent(footprint: calculateRecoCommand.footPrint,recommendation: this ))

}
@EventHandler
public void handle(RecommendationCalculatedDomainEvent recommendationCalculatedDomainEvent){
log.info "HELLO"
}


}

External eventHandlrer


package es.osoco.ulyseo.recommender.application.recommendation

import es.osoco.ulyseo.recommender.application.common.Generator
import es.osoco.ulyseo.recommender.application.rule.ScoringRuleEngine
import es.osoco.ulyseo.recommender.common.DomainEventPublisher
import es.osoco.ulyseo.recommender.domain.model.recommendation.Recommendation
import es.osoco.ulyseo.recommender.domain.model.recommendation.RecommendationCalculatedDomainEvent
import es.osoco.ulyseo.recommender.port.adapter.recommendation.RecommendationPort
import groovy.util.logging.Slf4j
import org.apache.flink.shaded.com.google.common.util.concurrent.Striped
import org.axonframework.commandhandling.CommandCallback
import org.axonframework.commandhandling.CommandMessage
import org.axonframework.commandhandling.gateway.CommandGateway
import org.axonframework.eventhandling.EventHandler
import org.axonframework.eventsourcing.eventstore.EventStore
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Service

import javax.annotation.PostConstruct
import java.util.concurrent.Semaphore

@Slf4j
@Service
class CalculateRecommendationUseCase implements Serializable {

@Autowired
EventStore eventStore

@Autowired
CommandGateway commandGateway

@Autowired
ScoringRuleEngine scoringRuleEngine

@Autowired
RecommendationPort recommendationRepository

@Autowired
DomainEventPublisher domainEventPublisher;

@Value('${app.packagesCountForRecommendation}')
Integer packagesCountForRecommendation

@Value('${app.concurrentFootprintExpected}')
Integer concurrentFootprintExpected

@Value('${app.concurrentPermitsByFootprint}')
Integer concurrentPermitsByFootprint

static Striped<Semaphore> semaphores

@PostConstruct
public void initializeSemaphores() {
semaphores = semaphores ?: Striped.semaphore(concurrentFootprintExpected, concurrentPermitsByFootprint)
}

void execute(CalculateRecommendationCommand calculateRecommendationCommand) {
//Translate to Domain
String footPrint = Generator.generateFootprint(calculateRecommendationCommand.getSearch())
Recommendation recommendation = calculateRecommendation(footPrint, calculateRecommendationCommand)


}

private void calculateRecommendation(String footPrint, CalculateRecommendationCommand calculateRecommendationCommand) {
Semaphore semaphore = semaphores.get(footPrint)
semaphore.acquire()

try {
//Recommendation recommendation = recommendationRepository.get(footPrint)
/*if (!recommendation) {
recommendation = new Recommendation()
recommendation.setFootprint(footPrint)
}*/
//recommendation.setPackagesCountForRecommendation(packagesCountForRecommendation)

//TODO: Convert application to Domain TravelPackages


List<es.osoco.ulyseo.recommender.domain.model.recommendation.TravelPackage> receivedTravelPackages = calculateRecommendationCommand.packages

assignScoring(receivedTravelPackages, calculateRecommendationCommand.travelCriteria)

//recommendation.calculateRecommendation(receivedTravelPackages, calculateRecommendationCommand.getTravelCriteria())

commandGateway.send(new CalculateRecoCommand(travelCriteria:calculateRecommendationCommand.getTravelCriteria(),footPrint: footPrint, receivedTravelPackages: receivedTravelPackages), new CommandCallback() {
@Override
void onSuccess(CommandMessage commandMessage, Object result) {
log.info("OK {}",result)

}

@Override
void onFailure(CommandMessage commandMessage, Throwable cause) {

log.info("KO {}",cause)

}
})



}
catch(Exception e){
log.error("Error calculating recommendation",e)

}

finally {
semaphore.release()
}
}

private RecommendationCalculatedDomainEvent buildRecommendationCalculatedEvent(CalculateRecommendationCommand calculateRecommendationCommand, Recommendation recommendation) {
RecommendationCalculatedDomainEvent recommendationCalculatedDomainEvent = new RecommendationCalculatedDomainEvent()
recommendationCalculatedDomainEvent.setFootprint(recommendation.getFootprint())
recommendationCalculatedDomainEvent.setPackagesCountForRecommendation(packagesCountForRecommendation)
recommendationCalculatedDomainEvent.setTravelCriteria(calculateRecommendationCommand.getTravelCriteria())
recommendationCalculatedDomainEvent.setRecommendation(recommendation)
recommendationCalculatedDomainEvent
}


private void assignScoring(List<es.osoco.ulyseo.recommender.domain.model.recommendation.TravelPackage> receivedTravelPackages, es.osoco.ulyseo.recommender.domain.model.recommendation.TravelCriteria travelCriteria){

receivedTravelPackages.each { travelPackage ->

scoringRuleEngine.execute(travelPackage)

}
}
@EventHandler
public void handle(RecommendationCalculatedDomainEvent recommendationCalculatedDomainEvent){
recommendationRepository.set(recommendationCalculatedDomainEvent.footprint,recommendationCalculatedDomainEvent.recommendation)
domainEventPublisher.publish(recommendationCalculatedDomainEvent)
}


}







Fernando París

unread,
Jan 6, 2017, 4:51:25 AM1/6/17
to Axon Framework Users
I found the problem. :I

The Event was implementing DomainEventMessage, That was the problem

Thanks to all

An awesome product
Reply all
Reply to author
Forward
0 new messages