What is the right way to send different messages and trigger side effects in reactive messaging?

187 views
Skip to first unread message

Matthias Haschka

unread,
Oct 2, 2021, 12:10:57 PM10/2/21
to SmallRye
Hello,

i'm stuck working with reactive messaging. It is also possible that I have not yet fully understood the whole concept.

Perhaps this question sounds a bit silly, but I want to trigger different events in my application depending on the input. The only hint in the documentation
Acknowledgment when using streams was that it is 'sophisticated'.

I can't find any examples in which non-trivial processing takes place, i.e. how decisions are made and passed on by the business logic, as well as the triggering of side effects and error handling.

The following code should illustrate my question as an example. Here I try to import a recipe from a data source using a JobCreatedEvent message, to persist it and then, if successful, to send a RecipeImportedEvent message so that the recipe can be processed further. Actually, in the event of an error, I would like to send another message stating that the import failed to do something else.

@Inject
@Channel("job-created")
Emitter<ImportFailedEvent> emitter

@Incoming("job-created")
@Outgoing("recipe-imported")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Multi<Message<RecipeImportedEvent>> onJobCreated(Multi<Message<JobCreatedEvent>> event) {
var service = new RecipeImport();
return event
.onItem().transformToUni(e -> service.grab(dataSource, e.getPayload())
.flatMap(recipe -> repository.persist(recipe))
.map(recipe -> UriBuilder.fromResource(RecipeResource.class).path("{id}").build(recipe.recipeId))
.map(location -> new RecipeImportedEvent(e.getPayload().jobId(), location))
.onItem().invoke(x -> e.ack())
.onFailure().invoke(failure -> { emitter.send(new ImportFailedEvent(failure); e.ack() })
.onFailure().recoverWithNull())

.merge()
.filter(Objects::nonNull)
.map(Message::of);
}

My code seems to me to be pretty confused and cumbersome, which is always an indication that you are doing something wrong.

Perhaps one of the hardworking readers of this group can help me further. A few good examples would be very helpful.

Kind regards

clement escoffier

unread,
Oct 5, 2021, 2:25:18 AM10/5/21
to smal...@googlegroups.com
Hello,

For some weird reasons, you email went to my spam folder. Sorry about that.

There are some simplification that can be done.
SmallRye Reactive Messaging automatically dropped “null” messages, so something like this should work:

@Incoming("job-created")
@Outgoing("recipe-imported")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Multi<Message<RecipeImportedEvent>> onJobCreated(Multi<Message<JobCreatedEvent>> event) {
var service = new RecipeImport();
return event
            .onItem().transformToUniAndMerge(e ->
service.grab(dataSource, e.getPayload())
.chain(recipe -> repository.persist(recipe))

.map(recipe -> UriBuilder.fromResource(RecipeResource.class).path("{id}").build(recipe.recipeId))
.map(location -> new RecipeImportedEvent(e.getPayload().jobId(), location))
                        .call(x -> Uni.createFrom().completionStage(e.ack()))
                        .map(e::withPayload)
.onFailure().invoke(failure -> {
emitter.send(new ImportFailedEvent(failure));
e.ack(); // Are you sure you want to ack ?
})
.onFailure().recoverWithNull()
);
}

A few things:

  • I’ve using call about the ack, because I wanted to be sure the ack completed before going further
  • I’ve used withPayload to create the new message, but that’s up to you (that would copy the ack and metadata)
  • Do you want to ack on failure?

The Kafka connector supports DLQ, it may be useful in your case (and would remove the need for the emitter).

Also, in the case of Kafka, I would not use transformToUniAndMerge but concat to avoid breaking the order.

Clement

--
You received this message because you are subscribed to the Google Groups "SmallRye" group.
To unsubscribe from this group and stop receiving emails from it, send an email to smallrye+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/smallrye/867fa07e-c345-40f4-bdcc-3c35fe9b38d1n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages