I have a small project to play around with Kogito and educate myself along the way:
https://github.com/karol-brejna-i/kogito-mqtt-decision-process-kafka/tree/mqtt-only (please, note 'mqtt-only' branch).
In my project, I want to start a process every time an MQTT message is received on a specific topic.
This did not work out of the box. MQTT sends byte[] (not Strings) and I got some errors.
I walked around the problem by defining the process like this:
And adding a simple "bridge" that reads the messages from MQTT, converts and sends them to a channel defined in the process (represented by 'moves' element in the diagram above):
public class Brydzia {
private static final Logger LOG = Logger.getLogger(Brydzia.class);
@Incoming("source")
@Outgoing("kogito_incoming_stream")
public String deserialize(byte[] payload) {
return new String(payload);
}
}
Please, note "kogito_incoming_stream" which seems to be the name assumed for a channel used in the BPMN process.
This works like a charm. Or, worked like a charm until version 1.11.
Today I upgraded Quarkus and Kogito and I see some problems.
I see that it works as previously up to Quarkus 2.2.2.Final which comes with Kogito 1.10.0Final.
The next version (2.2.3.Final) brings Kogito 1.11.1.Final and the application stops working
2021-10-07 14:50:48,240 WARN [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00207: Some components are not connected to either downstream consumers or upstream producers:
- ProcessingMethod{method:'org.demo.rsotf.Brydzia#deserialize', incoming:'source', outgoing:'kogito_incoming_stream'} has no downstream
I assume the "hardcoded" name for the process channel had changed.
So, I have the questions:
1. What is the new name for the channel?
2. Is there a more elegant way to send a message to a channel that initiates a process?
Regards,
Karol