BPMN process starting with a message -- channel name

413 views
Skip to first unread message

gruby_karol

unread,
Oct 7, 2021, 9:22:49 AM10/7/21
to Kogito development mailing list
I have a small project to play around with Kogito and educate myself along the way: https://github.com/karol-brejna-i/kogito-mqtt-decision-process-kafka/tree/mqtt-only (please, note 'mqtt-only' branch).

In my project, I want to start a process every time an MQTT message is received on a specific topic. 
This did not work out of the box. MQTT sends byte[] (not Strings) and I got some errors.

I walked around the problem by defining the process like this: 
process_overview.png


And adding a simple "bridge" that reads the messages from MQTT,  converts and sends them to a channel defined in the process (represented by 'moves' element in the diagram above):

public class Brydzia {

    private static final Logger LOG = Logger.getLogger(Brydzia.class);

    @Incoming("source")

    @Outgoing("kogito_incoming_stream")

    public String deserialize(byte[] payload) {

        return new String(payload);

    }

}


Please, note "kogito_incoming_stream" which seems to be the name assumed for a channel used in the BPMN process.

This works like a charm. Or, worked like a charm until version 1.11.

Today I upgraded Quarkus and Kogito and I see some problems.
I see that it works as previously up to Quarkus 2.2.2.Final which comes with Kogito 1.10.0Final.
The next version (2.2.3.Final) brings Kogito 1.11.1.Final and the application stops working

2021-10-07 14:50:48,240 WARN  [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00207: Some components are not connected to either downstream consumers or upstream producers:
        - ProcessingMethod{method:'org.demo.rsotf.Brydzia#deserialize', incoming:'source', outgoing:'kogito_incoming_stream'} has no downstream

I assume the "hardcoded" name for the process channel had changed.

So, I have the questions:
1. What is the new name for the channel?
2. Is there a more elegant way to send a message to a channel that initiates a process?

Regards,
Karol

gruby_karol

unread,
Oct 20, 2021, 3:55:54 AM10/20/21
to Kogito development mailing list
Anyone?

Francisco Javier Tirado Sarti

unread,
Oct 20, 2021, 10:50:29 AM10/20/21
to Kogito development mailing list
Hi,

Take a look to this blog https://blog.kie.org/2021/09/kogito-process-eventing-add-ons.html. There I try to explain how this is supposed to work

I think you can now skip Brydzia class and just add to your application.properties

mp.messaging.incoming.moves.connector=smallrye-kafka mp.messaging.incoming.moves.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.moves.topic=source

Your byte[] should be converted to String automatically out of the box.

If that still does not work, you can keep Brydzia and define

mp.messaging.incoming.moves.connector=kogito_incoming_stream mp.messaging.incoming.moves.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Francisco Javier Tirado Sarti

unread,
Oct 20, 2021, 10:57:32 AM10/20/21
to Kogito development mailing list
I make a mistake at the end of the previous e-mail
If you decide to keep Brydzia in the middle,  you need
mp.messaging.incoming.kogito_incoming_stream.connector=smallrye-kafka
mp.messaging.incoming.kogito_incoming_stream.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Reply all
Reply to author
Forward
0 new messages