Setup saga for listening with Spring Cloud Stream

261 views
Skip to first unread message

altug sahin

unread,
Mar 4, 2018, 3:01:40 PM3/4/18
to Axon Framework Users
Hello,

There are two services:

Service 1 - Publishes "PlatformCreatedEvent" to a RMQ exchange.
Service 2 - Consumes this message to trigger a saga.

Here is the problem. The saga is not triggered since axon is not aware of this received event in Service 2. I don't know how to configure Axon with Spring Cloud Stream with RMQ.

Since I see message being received in Service 2 in the handlePlatformCreated function, I am leaving out the configuration of Service 1.

SERVICE 2 Snippets:

class OnboardingSaga {
private val businessId = ""

@Autowired
@Transient
lateinit var commandGateway: CommandGateway

@StartSaga
@SagaEventHandler(associationProperty = "id", keyName = "businessId")
fun handle(event: PlatformCreatedEvent) {
println("I am not being triggered!")
}
}


@Configuration
class AxonConfig {

@Bean
fun sagaConfiguration(): SagaConfiguration<*> {
return SagaConfiguration.trackingSagaManager(OnboardingSaga::class.java)
}

}

@Component
interface Service2Channels {

@Input(ApplicationExchanges.PLATFORM_CREATED)
fun platformCreated(): SubscribableChannel
}


@EnableBinding(Service2Channels::class)
class PlatformEventConsumer {

@StreamListener(ApplicationExchanges.PLATFORM_CREATED)
fun handlePlatformCreated(event: PlatformCreatedEvent) {
println(">>>>> I am being triggered! YAY!!!)
}
}


Thanks







Allard Buijze

unread,
Mar 5, 2018, 4:18:26 AM3/5/18
to axonfr...@googlegroups.com
Hi Altug,

the problem is with your AxonConfig class. The sagaConfiguration bean must either stick to the "convention", or you must explicitly define the configuration's bean name in the @Saga annotation.

The default bean name to use for a Saga class called MySaga is mySagaConfiguration.
Also, the default source for a Tracking Processor is the Event Store. If you want to use another source, you must specify that in the SagaConfiguration.

Hope this helps.
Cheers,

Allard

Op zo 4 mrt. 2018 om 21:01 schreef altug sahin <altug...@gmail.com>:
--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
Allard Buijze
CTO

E: allard...@axoniq.io
T: +31 6 34 73 99 89

altug sahin

unread,
Mar 5, 2018, 5:41:37 PM3/5/18
to Axon Framework Users
Hi Allard,

Thanks you for your response. I'll try this but I have a quick follow up question on something on tangent. In Axon saga samples I see this code (Kotlin in this case):

@RabbitListener(queues=["queue name"])

Why is it queues we set but not exchange? In RMQ (AMQP) we talk to exchanges not queues.

Thanks

altug sahin

unread,
Mar 5, 2018, 9:06:41 PM3/5/18
to Axon Framework Users

Hi Allard,

I realized I missed adding the AMQP config snippet in Service 2:

@Configuration
class AmqpConfig {
@Bean
fun springAMQPMessageSource(serializer: Serializer): SpringAMQPMessageSource {
return object : SpringAMQPMessageSource(serializer) {
// @RabbitListener(queues = [(ApplicationExchanges.PLATFORM_CREATED)])
@StreamListener(ApplicationExchanges.PLATFORM_CREATED)
override fun onMessage(message: Message, channel: Channel) {
super.onMessage(message, channel)
}
}
}
}

I updated the AxonConfig as you suggested.

@Configuration
class AxonConfig {

@Bean
fun onboardingSagaConfiguration(): SagaConfiguration<*> {
return SagaConfiguration.trackingSagaManager(OnboardingSaga::class.java)
}

}


I see the message is received but I get serialization exception for the received event

data class PlatformCreatedEvent(
var id: PlatformId,
var name: String = "",
var awsAccessKey: String = "",
var awsSecretKey: String = ""
)


data class PlatformId(var id: String = IdentifierFactory.getInstance().generateIdentifier()) {
override fun toString(): String {
return id
}

fun identifier(): String {
return id
}
}


The exception is:

2018-03-05 20:42:31.704  WARN 49059 --- [kGqR36QeUMKjw-1] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'{"id":{"id":"74b91da7-41e7-4f53-97f7-4ebec8b27e67"},"name":"homer","awsAccessKey":"string","awsSecretKey":"string"}' MessageProperties [headers={contentType=text/plain, originalContentType=application/json;charset=UTF-8}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=wd-platform-out-platform-created, receivedRoutingKey=wd-platform-out-platform-created, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-9Y6XUpLYSwN0hd2GjlDexw, consumerQueue=wd-platform-out-platform-created.anonymous.v0UkXL4fRkGqR36QeUMKjw])

Issue seems to be with PlatformId (id property of PlatformCreatedEvent)

Body:'{"id":{"id":"74b91da7-41e7-4f53-97f7-4ebec8b27e67"},"name":"homer","awsAccessKey":"string","awsSecretKey":"string"}'

id is set to "id: { uuid }" instead of "{id: uuid}"

The spring cloud stream content type is set to json on both service 1 and 2.

spring.cloud.stream.default.contentType=application/json

Any ideas why the serialization fails?

Thank you


On Monday, March 5, 2018 at 4:18:26 AM UTC-5, Allard Buijze wrote:

Steven van Beelen

unread,
Mar 6, 2018, 2:51:25 AM3/6/18
to axonfr...@googlegroups.com
Hi Altug,

You might want to try the `@JsonValue @JsonProperty` annotations on the getter of the `id` field in your `PlatformId` class.
I believe that should make it so that your id will not be wrapped in another `id{...}` field.
Any how my hunch is that the solution is somewhere in Jackson annotations rather than Axon Framework.

Cheers,
Steven
Reply all
Reply to author
Forward
0 new messages