Integrating with external systems

18 views
Skip to first unread message

Mario Camou

unread,
Jan 25, 2011, 1:13:13 PM1/25/11
to openwfe...@googlegroups.com
Hi all,

We're working with ruote-kit and ruote-amqp. I've had a question for a long while that our resident Ruote talents haven't been able to answer. Since I am now in the position of taking over some of the Ruote duties, I thought it would be a good idea to get this cleared up in my mind.

What is the accepted/recommended way of integrating Ruote with external systems? For example, suppose you have an existing system which integrates through AMQP messages. The system has its own (binary) message formats that it receives and sends. We now want to integrate this system into some Ruote workflows, so, for example, some messages will start workflows (with parameters taken from parts of the message payload), other messages will be received in the middle of a workflow (i.e., the workflow will pause until the receipt of a message, and will then have to correlate the incoming message with the existing expressions to see to which one the message passes), and the workflow should also be capable of generating AMQP messages in the formats expected by the external system.

I've worked with other workflow systems (specifically, Sun/Oracle JavaCAPS) in which the incoming/outgoing messages are completely disassociated from the workitems. In those systems, the workflow itself is responsible for getting the information out of the message payload and into the workitem. However, I see that Ruote always works in terms of workitems, so it's expecting to send/receive JSON workitems through AMQP. I was wondering, in Ruote, where do you place the logic to convert between workitems and external system formats, and the correlation logic to correlate incoming messages with process instances/expressions?

Thanks!
-Mario

Nathan Stults

unread,
Jan 25, 2011, 1:30:37 PM1/25/11
to openwfe...@googlegroups.com

That is the environment we’re in, integrating ruote with a variety of external systems, and we just built our own thin integration layer into our application which handles mapping messages coming in from the outside and knowing when and how to talk to or signal ruote. The ruote-amqp gem is designed for you to add your own message handlers that do that kind of thing. Personally, I don’t think it is necessarily ruote’s responsibility to map and interpret messages from the outside world, although that might make a nice additional component library, like ruote-kit, but for enabling more general purpose integration with message transformation capabilities via a nice DSL, AMQP or other queue listeners, and whatnot. In any case, the way I look at it, the systems integration concerns and configuration aren’t necessarily directly related to the workflow definitions themselves, and should probably be managed independently at a level above the ruote core itself.

--
you received this message because you are subscribed to the "ruote users" group.
to post : send email to openwfe...@googlegroups.com
to unsubscribe : send email to openwferu-use...@googlegroups.com
more options : http://groups.google.com/group/openwferu-users?hl=en

John Mettraux

unread,
Jan 25, 2011, 8:48:26 PM1/25/11
to openwfe...@googlegroups.com
Hello Mario, Hello Nathan,

+1 for Nathan's answer.

I will try to complement a bit / be more technical.

Ruote is written in Ruby, this programming language has many advantages, among them :

- you can write / test something very quickly, then turn that into a production thing or discard it

- the Ruby ecosystem has lots of libraries, usually there are more than one library for a given problem/domain/... It's very very rich

For me this makes experimentation / integration very easy.

It's a bit unfortunate, but Rails has this slogan "convention over configuration" which is sometimes passed to adjacent systems as "you have to adapt to it". Some devs then go into a "that's the way to do it, it's magical" and build religion around stuff.

Ruote-amqp comes with a very straightforward way of doing things : serialize as JSON, publish... fetch, deserialize, pass back to engine.

You are not limited to that. Please do not limit your imagination.

Quoting Nathan :

> The ruote-amqp gem is designed for you to add your own message handlers
> that do that kind of thing

I.e you can override the #encode_workitem of RuoteAMQP::ParticipantProxy and #decode_workitem of RuoteAMQP::Receiver.

Or you can completely go off-tracks and do :

---8<---
class MarioParticipant
def initialize(opts)
@opts = opts
end
def consume(workitem)
MQ.queue(@opts[:queue], :durable => true).publish(
"workitem:" + workitem.fei.sid, :persistent => true)
end
def cancel(fei, flavour)
MQ.queue(@opts[:queue], :durable => true).publish(
"cancel:" + fei.sid, :persistent => true)
end
end

class MarioReceiver < Ruote::Receiver
def initialize(engine_worker_or_storage, opts)
super
MQ.queue(opts[:queue], :durable => true).subscribe do |msg|
handle(msg) unless AMQP.closing?
end
end
protected
def handle(msg)
fei = msg.split(':').last
workitem = workitem(fei)
workitem.fields['back'] = 'from AMQP'
reply_to_engine(workitem)
end
end
--->8---

So the "correlation" thing you want is the "fei" (flow expression id). That's how the engine can map something to a workitem / flow expression.

In that code example, given the fei, the workitem is fetched from the engine (wel the storage) directly, 'decorated' and 'replied to the engine'.

> I've worked with other workflow systems (specifically, Sun/Oracle JavaCAPS)
> in which the incoming/outgoing messages are completely disassociated from
> the workitems.

In ruote, the incoming/outgoing messages are completely disassociated from the system.

You can write a participant/receiver couple that ships info by donkey, ruote will never know and never care. Ruote dispatches workitems to participants and receives them back, sometimes. The rest is unicorns, dragons and elfs, but ruote never cares, the participant (and sometimes receiver) implementations do.


I hope this helps, best regards,

--
John Mettraux - http://jmettraux.wordpress.com

Mario Camou

unread,
Jan 26, 2011, 7:07:01 AM1/26/11
to openwfe...@googlegroups.com
I see. I'm getting my head around all of this and filtering what was Gonzalo's understanding of things.

Another thing. Where is the accepted/recommended place to put our customizations? Should I just create my own directory at the same level of ruote-kit and ruote-amqp and add it to $: in config.ru, and keep the rest of Ruote, ruote-kit and ruote-amqp pristine?

Mario Camou

unread,
Jan 26, 2011, 7:37:27 AM1/26/11
to openwfe...@googlegroups.com
OK, so I create my own Receiver (perhaps inheriting from RuoteAMQP::Receiver) and in there I can do whatever I want. As far as "correlation" is concerned, what I need to do is, find out the FEI to which to send the incoming data, get the workitem associated with that FEI, and plug in the data from the message.

Outbound messages get handled by my own Participant which needs to coordinate with the Receiver, perhaps storing the data needed for correlation somewhere where the Receiver can find it (perhaps a global Hash table, a database, or in the quantum state of the Universe) and associate it with the FEI. Correct?

Also, is there a way of searching through all the stored workitems, to do the correlation? So, suppose the message comes with certain fields (for example, "source_token" and "message_type") that need to match data inside of the workitem (for example, fields called "expected_token" and "expected_message_type"). In that case I could either store the "expected_token" and "expected_message_type" someplace and associated with the FEI, so the Receiver can get at them, or have the Receiver search the stored workitems to find the FEI. What would be the preferred way of doing this?

Thanks for all the support! I realize these are probably rather basic questions but again, I'm trying to map my knowledge of workflow systems and the things Gonzalo told me to how things work in Ruote.

Cheers,
-Mario.


John Mettraux

unread,
Jan 26, 2011, 8:18:57 AM1/26/11
to openwfe...@googlegroups.com

On Wed, Jan 26, 2011 at 04:07:01AM -0800, Mario Camou wrote:
>
> Another thing. Where is the accepted/recommended place to put our
> customizations? Should I just create my own directory at the same level of
> ruote-kit and ruote-amqp and add it to $: in config.ru, and keep the rest of
> Ruote, ruote-kit and ruote-amqp pristine?

Hello Mario,

it depends.

I don't know much about your setting, from what I remember you're talking directly to ruote-kit and sometimes interface with ruote-amqp.

a) you use ruote-kit has a library
b) you fork ruote-kit and maintain the fork

b) => your ruote-kit fork contains the customizations.
a) => you require ruote-kit and re-open classes and do customizations at will...

I'm probably completely wrong. Vanilla Ruby common sense should apply.


Best regards,

John Mettraux

unread,
Jan 26, 2011, 8:35:05 AM1/26/11
to openwfe...@googlegroups.com

On Wed, Jan 26, 2011 at 04:37:27AM -0800, Mario Camou wrote:
>
> OK, so I create my own Receiver (perhaps inheriting from
> RuoteAMQP::Receiver) and in there I can do whatever I want. As far as
> "correlation" is concerned, what I need to do is, find out the FEI to which
> to send the incoming data, get the workitem associated with that FEI, and
> plug in the data from the message.
>
> Outbound messages get handled by my own Participant which needs to
> coordinate with the Receiver, perhaps storing the data needed for
> correlation somewhere where the Receiver can find it (perhaps a global Hash
> table, a database, or in the quantum state of the Universe) and associate it
> with the FEI. Correct?

Hello Mario,

yes. If you follow my suggestion, you'll notice that passing the fei avoids the need to map to a custom correlation id, the fei being the correlation id.

> Also, is there a way of searching through all the stored workitems, to do
> the correlation? So, suppose the message comes with certain fields (for
> example, "source_token" and "message_type") that need to match data inside
> of the workitem (for example, fields called "expected_token" and
> "expected_message_type"). In that case I could either store the
> "expected_token" and "expected_message_type" someplace and associated with
> the FEI, so the Receiver can get at them, or have the Receiver search the
> stored workitems to find the FEI. What would be the preferred way of doing
> this?

Re-working my previous sample :

---8<---
class MarioReceiver

def handle(msg)

correlation_id = extract_correlation_id(msg)

expressions = @engine.processes.collect { |process_status|
process_status.expressions.select { |expression|
expression.class == Ruote::Exp::ParticipantExpression
}
}.flatten

expression = expressions.find { |exp|
exp.h.applied_workitem['fields']['correlation_id'] == correlation_id
}

raise "not found" unless expression

workitem = Ruote::Workitem.new(expression.h.applied_workitem)

reply_to_engine(workitem)
end
end
--->8---

Something like that.

The problem : it goes through all the process instances... The fei as correlation id method is so much "direct"...

Now I could show you examples of how to store your correlation_id <-> fei mapping, but oh well...

"expected_token" could easily contain the fei, and "expected_message_type"... does it matter ? The receiver should just discard irrelevant messages before even thinking about correlation.


Best regards,

Mario Camou

unread,
Jan 26, 2011, 9:05:34 AM1/26/11
to openwfe...@googlegroups.com
Let me elaborate further. I have a set of external devices each of which has its own ID (what I previously called the "token"). Here's what we're trying to do:

1. When a message of type 1 is received, start the workflow "1-process-message.rb" and store the token of the device that sent the message.

2. Do some processing and send a message (say, type 2) to the device associated with this workflow instance.

3. Wait for said device to reply with a message of type 3 (say, an acknowledge)

The thing here is, we have multiple such devices that will be processing in parallel. So, the token we're using is the device ID and the idea is that if we receive a message of type 3 from device X, it should be sent to the FEI associated with step 3 of the correct instance of the given workflow.

At the moment the solution has been to store the workitems and all the stuff inside our gateway module (the module that connects directly to the devices and talks AMQP with Ruote) but I find it rather error-prone, also, I believe that the fact that message 3 comes after message 2 should be represented only in the workflow (currently that knowledge is also being embedded in the gateway -- well it will be as soon as the gateway is finished if we continue with this approach).

I'd like to have something like this:

amqp send_message :payload => {:type => 2, :device => ${f:device_id}} :wait_for_message => 3

or even

# Assume field device_id has been set somewhere before this point
set :f => 'expected_message', :value => 3
amqp send_message :payload => {:type => 2}

and have the participant wait until the proper message is received for the given device ID (considering that, at a particular moment in time, there might be multiple instances of the participant waiting for the same message type from different device IDs). So in this case, my correlation key would be (device_id, message_id) and from that info I'd get the FEI back. If a message is received for which there is no correlation I should save it in case the station is faster than Ruote (i.e., the station replied with the 3 before Ruote got to the point of expecting message 3... although we should be able to handle this in the participant itself which would mean that unmatched messages get discarded).

So again, when I get the message in the Receiver, in the second case I could look for a workitem with the given expected_message and device_id fields, or in the first case I could store that data in the Participant and that's all standard Ruby stuff - I could probably use a Hash stored in a class variable or singleton, an instance of Rufus::Cloche or some sort of database. I'm thinking of using in-memory storage since, for the purposes of this business case, if Ruote crashes the world should reset itself, at least as far as waiting for messages is concerned... which raises another point, on startup I'd need to cancel all the processes that are waiting for a message...

Sorry if this is a bit rambling, I'm kind of thinking my way through this as I write.

Cheers!

John Mettraux

unread,
Jan 26, 2011, 9:32:31 AM1/26/11
to openwfe...@googlegroups.com

On Wed, Jan 26, 2011 at 06:05:34AM -0800, Mario Camou wrote:
> Let me elaborate further. I have a set of external devices each of which has
> its own ID (what I previously called the "token"). Here's what we're trying
> to do:
>
> 1. When a message of type 1 is received, start the workflow
> "1-process-message.rb" and store the token of the device that sent the
> message.

OK, no correlation id involved (there is no process instance).

Receiver treat messages of type 1 as launch requests.

> 2. Do some processing and send a message (say, type 2) to the device
> associated with this workflow instance.

Reaching a participant.

Participant emit a type 2 message.

> 3. Wait for said device to reply with a message of type 3 (say, an
> acknowledge)

Participant is waiting for the workitem back.

Receiver treat message of type 3 as following type 2.

> The thing here is, we have multiple such devices that will be processing in
> parallel. So, the token we're using is the device ID and the idea is that if
> we receive a message of type 3 from device X, it should be sent to the FEI
> associated with step 3 of the correct instance of the given workflow.
>
> At the moment the solution has been to store the workitems and all the stuff
> inside our gateway module (the module that connects directly to the devices
> and talks AMQP with Ruote) but I find it rather error-prone, also, I believe
> that the fact that message 3 comes after message 2 should be represented
> only in the workflow (currently that knowledge is also being embedded in the
> gateway -- well it will be as soon as the gateway is finished if we continue
> with this approach).

Yes, it's implicit in ruote, a participant emits a message, a receiver receives it.

> I'd like to have something like this:
>
> amqp send_message :payload => {:type => 2, :device => ${f:device_id}}
> :wait_for_message => 3
>
> or even
>
> # Assume field device_id has been set somewhere before this point
> set :f => 'expected_message', :value => 3
> amqp send_message :payload => {:type => 2}

No need for those type things (see top of this email).

> and have the participant wait until the proper message is received for the
> given device ID (considering that, at a particular moment in time, there
> might be multiple instances of the participant waiting for the same message
> type from different device IDs). So in this case, my correlation key would
> be (device_id, message_id) and from that info I'd get the FEI back.

Multiple instance of the participant ?

Maybe you'd need to work with a "listen" expression.

I have the impression that you're multiplexing : 1 participant receives 1 workitem, it emits multiple AMQP messages and it waits for all those messages to come back...

> If a
> message is received for which there is no correlation I should save it in
> case the station is faster than Ruote (i.e., the station replied with the 3
> before Ruote got to the point of expecting message 3... although we should
> be able to handle this in the participant itself which would mean that
> unmatched messages get discarded).

Not possible. Ruote is ready to receive a reply for a participant as soon as this participant is reached (applied).


> So again, when I get the message in the Receiver, in the second case I could
> look for a workitem with the given expected_message and device_id fields, or
> in the first case I could store that data in the Participant and that's all
> standard Ruby stuff - I could probably use a Hash stored in a class variable
> or singleton, an instance of Rufus::Cloche or some sort of database. I'm
> thinking of using in-memory storage since, for the purposes of this business
> case, if Ruote crashes the world should reset itself, at least as far as
> waiting for messages is concerned... which raises another point, on startup
> I'd need to cancel all the processes that are waiting for a message...

So you're multiplexing. The participant receives a workitem from ruote and then it sends multiple AMQP messages... This should not appear at all in the process definition. You should not couple.

sequence do
whatever_amqp
end

The payload is the payload of the current workitem, the rest is resolved by the participant / receiver implementation pair.

Your message types won't change, if they do, then maybe it's OK to specify them in the process definitions, but they are merely constants in the participant/receiver implementations.


> Sorry if this is a bit rambling, I'm kind of thinking my way through this as
> I write.

Please don't overcomplicate it.


Mario, I have the impression that you're in a bad situation and trying to use ruote without knowing much about it.

Please take the time to run simple examples and then build up wider examples and then let's resume the conversation.

John Mettraux

unread,
Jan 26, 2011, 8:06:14 PM1/26/11
to openwfe...@googlegroups.com
Hello Mario,

sorry for yesterday evening, my fuse was rather short.

Here is what I understand from your case, summarized in a piece of sample (running) code :

https://gist.github.com/797852

---8<---
require 'thread'
# for the Queue class

require 'rubygems'
require 'yajl'
require 'ruote'

PDEF = Ruote.process_definition do
sequence do
device :device => 4
device :device => 7
end
end

# Re-opening to add a #device method
#
class Ruote::Workitem
def device
params['device'] || fields['device']
end
end

class AmqpParticipant
include Ruote::LocalParticipant

def consume(workitem)
correlate(workitem)
$queue << encode(workitem)
end
def cancel(fei, flavour)
# no implementation for this example
end
protected
def encode(workitem)
Rufus::Json.encode({ 'type' => 2, 'device' => workitem.device })
end
def correlate(workitem)
correlations = @context.storage.get_engine_variable('_correlations') || []
correlations << [ workitem.device, workitem.fei.sid ]
@context.storage.put_engine_variable('_correlations', correlations)

p [ :out, @context.storage.get_engine_variable('_correlations') ]
end
end

class AmqpReceiver < Ruote::Receiver

def initialize(engine, options={})
super
Thread.new { listen }
end
protected
def listen
loop do
sleep(rand * 0.1)
msg = $queue.pop # blocking
hsh = (Rufus::Json.decode(msg) rescue nil)
p [ :receiver, hsh ]
next if hsh == nil
case hsh['type']
when 1
launch(PDEF)
when 3
correlate(msg, hsh)
else
$queue << msg # put back message
end
end
end
def correlate(msg, hsh)

puts "received message from device #{hsh['device']}"

correlations = @context.storage.get_engine_variable('_correlations') || []

p [ :in, correlations ]

correlation = correlations.find { |cor| cor.first == hsh['device'] }

if correlation
correlations.delete(correlation)
@context.storage.put_engine_variable('_correlations', correlations)
wi = workitem(correlation[1])
reply_to_engine(wi) if wi
# ignore 'unrelated' msgs
else
return # discard
#$queue << msg # re-queue
# this version simply discards unexpected messages
# re-queueing... why not, could make the system busy...
end
end
end

class Devices

def initialize
@thread = Thread.new { listen }
end
def join
@thread.join
end
protected
def listen
loop do
sleep(rand * 0.1)
msg = $queue.pop # blocking
hsh = (Rufus::Json.decode(msg) rescue nil)
p [ :devices, hsh ]
next if hsh == nil
case hsh['type']
when 2
puts "device #{hsh['device']} received message..."
$queue << Rufus::Json.encode(hsh.merge('type' => 3))
else
$queue << msg # put back message
end
end
end
end

$engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
$queue = Queue.new
$receiver = AmqpReceiver.new($engine)
$engine.register_participant :device, AmqpParticipant
$devices = Devices.new

$engine.noisy = true
# displays all the engine activity

$queue << Rufus::Json.encode({ 'type' => 1 })
$queue << Rufus::Json.encode({ 'type' => 1 })
$queue << Rufus::Json.encode({ 'type' => 1 })

$devices.join
--->8---

It simulates AMQP with a Ruby Queue class. The correlation data is placed in an engine variable.

The queue is shared by the participants, the receiver and the devices...

It's tested against ruote 2.1.12 (edge), but it should work with ruote 2.1.11.


I hope this will help, sorry again,

John Mettraux

unread,
Jan 26, 2011, 9:15:00 PM1/26/11
to openwfe...@googlegroups.com
Hello Mario,

and here is a much better version, that avoids race issues on the correlation data.

https://gist.github.com/797940


Best regards,

Mario Camou

unread,
Jan 27, 2011, 8:58:17 AM1/27/11
to openwfe...@googlegroups.com
Thanks John. I'll have a look at it. Also, no worries on the "short fuse", I've had days like that and the least thing I want is for some clueless (or nearly-clueless) newbie to pepper me with questions :-). Again, thanks for the reply!

Cheers,
-Mario.
Reply all
Reply to author
Forward
0 new messages