Hello,
i'm stuck working with reactive messaging.
It is also possible that I have not yet fully understood the whole concept.Perhaps this question sounds a bit silly, but I want to trigger different events in my application depending on the input. The only hint in the documentation
I can't find any examples in which non-trivial processing takes place, i.e. how decisions are made and passed on by the business logic, as well as the triggering of side effects and error handling.
The following code should illustrate my question as an example. Here I try to import a recipe from a data source using a JobCreatedEvent message, to persist it and then, if successful, to send a RecipeImportedEvent message so that the recipe can be processed further. Actually, in the event of an error, I would like to send another message stating that the import failed
to do something else.
@Inject
@Channel("job-created")
Emitter<ImportFailedEvent> emitter
@Incoming("job-created")
@Outgoing("recipe-imported")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Multi<Message<RecipeImportedEvent>> onJobCreated(Multi<Message<JobCreatedEvent>> event) {
var service = new RecipeImport();
return event
.onItem().transformToUni(e -> service.grab(dataSource, e.getPayload())
.flatMap(recipe -> repository.persist(recipe))
.map(recipe -> UriBuilder.fromResource(RecipeResource.class).path("{id}").build(recipe.recipeId))
.map(location -> new RecipeImportedEvent(e.getPayload().jobId(), location))
.onItem().invoke(x -> e.ack())
.onFailure().invoke(failure -> { emitter.send(new ImportFailedEvent(failure); e.ack() })
.onFailure().recoverWithNull())
.merge()
.filter(Objects::nonNull)
.map(Message::of);
}
My code seems to me to be pretty confused and cumbersome, which is always an indication that you are doing something wrong.
Perhaps one of the hardworking readers of this group can help me further. A few good examples would be very helpful.
Kind regards