[ruote:2187] external participants and related questions

130 views
Skip to first unread message

Olle

unread,
Apr 19, 2010, 8:09:51 AM4/19/10
to ruote
Hello,


1. Ruote-kit works under “_ruote” prefix, but ruote-kit-client
connects to the root of web-server. I tried to set connect address in
several forms: http://localhost:8001/_ruote, http://localhost:8001
with no success. It looks like I missed something obvious…

2. I’ve implemented an external participant class, which creates task
in external task system. For some reason, I may not store the whole
workitem in that task system, so I just send wfid an expid to the
external system. To get information about completion of the task, I
adjusted ruote-kit and expose “engine.reply” method under “_ruote/
engine/reply”. Implementation is quite naïve(code quality may be poor,
sorry: I’m not experienced in Ruby yet): http://gist.github.com/370868
. This web-service method is called from outside. Also I modified
ruote-kit-client respectively. It works good, but I’m just wondering
is such approach OK and would It works without side-effects ?

3. Related to 2) suggestion. In my case, results of human tasks arrive
raw and unstructured and should be transformed before using in the
process. This could be done by additional step in the process like
this:
-----
sequence do
external_human_participant
process_results
end
------
But using this way I need to “process_results” after every external
human task and it looks artificial. What do you think about such an
idea: external participant implements “on_reply” method and engine
calls this method upon receiving a “reply” message? This let to
compact process definition and move proceeding of results into the
participant class, which knows how to proceed the results in context
of process.

4. This question/suggestion also related to external participants.
Please consider two slightly different scenarios:

a. Participant creates task in the external task system.
Also, It should annoy task performer with periodical (let’s say, every
day) e-mails like this: “Please read and complete the task…”. If user
doesn’t response for 3 days, frequency becames 2x: two times per day
user receives a message “Oh, please, please read and complete the
task…”. And so on. After a week, task gets cancelled. This behavior
may be implemented in the process definition by some combination of
“:timeout” participants and maybe “every” expression. But may be
it’s possible for participant to use internal scheduler to schedule
it’s own re-applying from “consume”? Better, participant may return
some “schedule” or "next run time/interval" object from “consume”
method which is processed by the engine/worker.

b. Participant works with external resource. If resource is
locked, participant should try every 10 minutes for 2 hours and raise
error if the resource still locked.
Above scenarios could be defined in the process definition,
but process becomes watered down and less readable. I'm almost sure
that it is should be the responsibility (at least in my particular
case) of the participant, not the process definition.

Best regards,
Oleg

--
you received this message because you are subscribed to the "ruote users" group.
to post : send email to openwfe...@googlegroups.com
to unsubscribe : send email to openwferu-use...@googlegroups.com
more options : http://groups.google.com/group/openwferu-users?hl=en

John Mettraux

unread,
Apr 19, 2010, 9:17:41 AM4/19/10
to openwfe...@googlegroups.com
On Mon, Apr 19, 2010 at 9:09 PM, Olle <foe...@gmail.com> wrote:
>
> 1.      Ruote-kit works under “_ruote” prefix, but ruote-kit-client
> connects to the root of web-server. I tried to set connect address in
> several forms: http://localhost:8001/_ruote, http://localhost:8001
> with no success. It looks like I missed something obvious…

Hello,

not sure if I understood the question / issue, but I have notified
Torsten and Kenneth about your email.

> 2.      I’ve implemented an external participant class, which creates task
> in external task system.  For some reason, I may not store the whole
> workitem in that task system, so I just send wfid an expid to the
> external system. To get information about completion of the task, I
> adjusted ruote-kit and expose “engine.reply” method under “_ruote/
> engine/reply”. Implementation is quite naïve(code quality may be poor,
> sorry: I’m not experienced in Ruby yet):  http://gist.github.com/370868
> . This web-service method is called from outside. Also I modified
> ruote-kit-client respectively. It works good, but I’m just wondering
> is such approach OK and would It works without side-effects ?

In those cases I would have gone for an extension of the
StorageParticipant class. It could look like this :

---8<---
class SemiExternalParticipant < Ruote::StorageParticipant
def consume (workitem)
notify_external_participant(workitem.fei)
super
end
end
--->8---

Then for the "reply", a PUT on /workitems/20100419-babaraca!!0_0 could
do the trick. And the workitem is waiting for you in the storage
participant, no need to call engine.process(wfid).

But you're the final judge. I don't see anything wrong with your approach.

> 3.      Related to 2) suggestion. In my case, results of human tasks arrive
> raw and unstructured and should be transformed before using in the
> process. This could be done by additional step in the process like
> this:
> -----
> sequence do
>        external_human_participant
>        process_results
> end
> ------
> But using this way I need to “process_results” after every external
> human task and it looks artificial. What do you think about such an
> idea: external participant implements “on_reply” method and engine
> calls this method upon receiving a “reply” message? This let to
> compact process definition and move proceeding of results into the
> participant class, which knows how to proceed the results in context
> of process.

In order to simplify the process definition, you could leverage a
subprocess. Something like :

---8<---
Ruote.process_definition 'main' do

sequence do
external_call
end

# ...

define 'external_call'
sequence do
external_human_participant
process_results
end
end
end
--->8---

even hiding the subprocess definition in an engine variable (easy
re-use by all process definitions) :

---8<---
sequence do
external_call
end

# ...

engine.variables['external_call'] = Ruote.process_definition do
sequence do
external_human_participant
process_results
end
end
--->8---

You can also call subprocesses with subprocess :ref =>
'http://pdefs.example.com/pdefs/external.rb'

But I understand the pain.

I will think about your on_reply idea. It makes lots of sense, but I'd
like to sleep on it for a while.

http://github.com/jmettraux/ruote/blob/ruote2.1/TODO.txt#L345-346

There is one thing. Since we now favour participants instantiated for
each dispatch over participants instantiated at register time, it
may/will not be the the 'same' participant doing the on_reply as the
one that did the dispatch.

If I get back to your question #2, you could do the "on_reply" in
there. But granted, this feels not right.

> 4.      This question/suggestion also related to external participants.
> Please consider two slightly different scenarios:
>
>         a.     Participant creates task  in the external task system.
> Also, It should annoy task performer with periodical (let’s say, every
> day) e-mails like this: “Please read and complete the task…”. If user
> doesn’t response for 3 days, frequency becames 2x: two times per day
> user receives a message “Oh, please, please read and complete the
> task…”. And so on. After a week, task gets cancelled. This behavior
> may be implemented in the process definition by some combination of
> “:timeout” participants and maybe “every” expression.   But may be
> it’s possible for participant to use internal scheduler to schedule
> it’s own re-applying from “consume”? Better, participant may return
> some “schedule” or "next run time/interval" object from “consume”
> method which is processed by the engine/worker.

It's possible.

Here is a Rails example : ( http://gist.github.com/370998 )

---8<---
#
# config/initializers/ruote.rb

RuoteKit.configure do |c|

c.run_worker = true unless $RAKE_TASK

#c.set_storage(...

c.register do

participant 'daily' do |workitem|
Toto::Operations.trigger_inbox_reminders
end

catchall Ruote::StorageParticipant
end
end

if $RAKE_TASK != true && Rails.env != 'test'

path = Rails.root.join(*%w[ tmp pids ruote_cron_process.wfid ]).to_s

wfid = File.read(path).strip rescue nil

RuoteKit.engine.cancel_process(wfid) if wfid

wfid = RuoteKit.engine.launch(Ruote.define(:name => 'cron_process') do
cron '0 6 * * *' do
daily
end
end)

File.open(path, 'wb') { |f| f.puts(wfid) }
end


#
# lib/toto/operations.rb

module Toto

# Fires reminders for the workitems of the inbox participant.
#
def self.trigger_inbox_reminders

workitems = RuoteKit.storage_participant.by_field('remind')

users = workitems.inject({}) do |h, workitem|

if (Time.now - workitem.last_reminder) > workitem.remind_frequency

(h[workitem.user] || []) << workitem
end

h
end

users.each do |u, workitems|
::NotificationMailer.reminder(workitems)
end
end
end
--->8---

There is a dedicated cron process that runs every day at 6 in the
morning and sends reminder to users if necessary.

I think it could be adapted to your case. I use a block participant
(mea culpa), you could use a custom participant and place all the
reminder logic in it (instead of my Toto::Operations helper module).
You could place reminder frequency data in the workitem itself (since
the user hasn't yet touched).

The global "gets cancelled after 1 week" thing can be left to the
participant timeout attribute. The cron simply won't send reminders
for workitems that are not present (it doesn't keep track of them).

On the other hand, the hardcore "ruote way" could look like :

---8<---
concurrence :count => 1 do

the_real_participant :timeout => '1w'

repeat do
sleep '${time_before_next_reminder}'
send_reminder_to_real_participant
end
end
--->8---

As soon as "the_real_participant" exits the reminding loop gets
cancelled (thanks to :count => 1 which tells the concurrence to expect
only one branch to reply (and then cancel)).

As you would say "it's watered down". It could be wrapped in a subprocess.

>         b.     Participant works with external resource. If resource is
> locked, participant should try every 10 minutes for 2 hours and raise
> error if the resource still locked.
>         Above scenarios could be defined in the process definition,
> but process becomes watered down and less readable. I'm almost sure
> that it is should be the responsibility (at least in my particular
> case) of the participant, not the process definition.

Those retry techniques belong to the participant IMHO.

Thanks for the excellent discussion ! I am learning a lot.

I'm trying to show you various techniques, I know you'll pick the
right one for your case or come up with some innovation.


Best regards,

--
John Mettraux - http://jmettraux.wordpress.com

Torsten Schoenebaum

unread,
Apr 19, 2010, 9:33:13 AM4/19/10
to openwfe...@googlegroups.com
John Mettraux wrote:

> On Mon, Apr 19, 2010 at 9:09 PM, Olle <foe...@gmail.com> wrote:

>> 1. Ruote-kit works under “_ruote” prefix, but ruote-kit-client
>> connects to the root of web-server. I tried to set connect address in
>> several forms: http://localhost:8001/_ruote, http://localhost:8001
>> with no success. It looks like I missed something obvious…
>
> not sure if I understood the question / issue, but I have notified
> Torsten and Kenneth about your email.

ruote-kit-client is a bit outdated, sorry. I hope to fix that really
soon, but it may take a couple of days as I'm really busy atm.

On the other questions: There's no use to write the same things as John
has done again -- I would have written nearly the same.

Yours,
Torsten

Olle

unread,
Apr 19, 2010, 12:23:52 PM4/19/10
to ruote
Hello, John!

Thank you for detailed response and samples, very useful!

Some preface to my comments below: 99,9999999999 )) percents of our
custom participants will work with an external CMS to store\retreive
\process information. It's also used as a task system and file
storage. The design of is provided "as is", so we can't modify it. My
above question about "locked resource" is our usual headache and we
need to take care of it. We can receive such reply from that system
even during creation of task for user. This is a main reason why I
asked all those questions and it's very important for us. Don't want
to discuss why we can't use other system: it's not only technical, but
also political issue, so we just try to do our best using it and make
life of our customers simpler.
Workflow is also part of that system, but I hope it could be replaced
with ruote. Currently we've integrated almost everything we need for
write and run some "typical" approval process using ruote in
conjunction with that system. We already ran some simplified tests and
ruote works exellent! At the moment we need to finalize our tests and
run them massively to elaborate the final decision.

> In those cases I would have gone for an extension of the
> StorageParticipant class. It could look like this :
>
> ---8<---
> class SemiExternalParticipant < Ruote::StorageParticipant
>   def consume (workitem)
>     notify_external_participant(workitem.fei)
>     super
>   end
> end
> --->8---
>
> Then for the "reply", a PUT on /workitems/20100419-babaraca!!0_0 could
> do the trick. And the workitem is waiting for you in the storage
> participant, no need to call engine.process(wfid).
>
> But you're the final judge. I don't see anything wrong with your approach.
>

Hmmm, I didn't think about extending StorageParticipant. That a good
remark, thank you! What do you mean saying "no need to call
engine.process(wfid)", is at a "heavy" for the engine?

>
> But I understand the pain.
>
> I will think about your on_reply idea. It makes lots of sense, but I'd
> like to sleep on it for a while.
>
>  http://github.com/jmettraux/ruote/blob/ruote2.1/TODO.txt#L345-346
>
> There is one thing. Since we now favour participants instantiated for
> each dispatch over participants instantiated at register time, it
> may/will not be the the 'same' participant doing the on_reply as the
> one that did the dispatch.

I'm not sure what is the problem here. Participant is stateless,
needed "state" information may be saved in the workitem. User just
provide a classname with some options, and it will have desired
behavior in any worker aquainted with this class. I planned to
implement some basic participant and subcluss it for "type" of tasks.
It will parse and transfrom results as needed. Usually we have 10-20
different type of tasks per workflow instance, used for all the
processes.

>
> If I get back to your question #2, you could do the "on_reply" in
> there. But granted, this feels not right.
>
> > 4.      This question/suggestion also related to external participants.

>
> It's possible.
>
> Here is a Rails example :  (http://gist.github.com/370998)
>
I agree, this is a good variant for rare reminders. I'm already
thinking of it, but not in terms of ruote, you gave me an almost ready-
to-use example, thanks!

>
> >         b.     Participant works with external resource. If resource is
> > locked, participant should try every 10 minutes for 2 hours and raise
> > error if the resource still locked.
> >         Above scenarios could be defined in the process definition,
> > but process becomes watered down and less readable. I'm almost sure
> > that it is should be the responsibility (at least in my particular
> > case) of the participant, not the process definition.
>
> Those retry techniques belong to the participant IMHO.

Sorry, I'm not sure what you meant here. Described above technique of
reminders? Usually we have 3000-6000 concurrent processes. Most of
them waits on parallel approval step contained human activities and
before/after processing (3-10 concurrent branches). So we will have
9000-60000 worktems in the storage. The nature of before\after
processing doesn't requre it to subclass of the storage participant -
it just do something with the information in the CMS, but rarely we
can receive "resource is locked" in that processing steps (as I
described in preface). To handle it in shown manner we will need to
subclass StorageParticipant for every processing step. To retry every
1 minute it will read and process all of the workitems in the storage
every minute, am I correct?

>
> Thanks for the excellent discussion ! I am learning a lot.
>
> I'm trying to show you various techniques, I know you'll pick the
> right one for your case or come up with some innovation.
>

John, thank you for your patience! )) I'm not sure who of us really
learning a lot ).

Oh, there are so many "I'm not sure" from me above...


Best regards,
Oleg

Olle

unread,
Apr 19, 2010, 5:25:41 PM4/19/10
to ruote
Some additional remarks.

>
> > But I understand the pain.
>
> > I will think about your on_reply idea. It makes lots of sense, but I'd
> > like to sleep on it for a while.
>
> >  http://github.com/jmettraux/ruote/blob/ruote2.1/TODO.txt#L345-346
>
> > There is one thing. Since we now favour participants instantiated for
> > each dispatch over participants instantiated at register time, it
> > may/will not be the the 'same' participant doing the on_reply as the
> > one that did the dispatch.
>
> I'm not sure what is the problem here. Participant is stateless,
> needed "state" information may be saved in the workitem. User just
> provide a classname with some options, and it will have desired
> behavior in any worker aquainted with this class. I planned to
> implement some basic participant and subcluss it for "type" of tasks.
> It will parse and transfrom results as needed. Usually we have 10-20
> different type of tasks per workflow instance, used for all the
> processes.
>


This also requires a possibility to return modified workitem to the
engine without reply_to_engine in the "consume" method. AFAIK, there
is no way to do this currently, correct?


>
> > >         b.     Participant works with external resource. If resource is
> > > locked, participant should try every 10 minutes for 2 hours and raise
> > > error if the resource still locked.
> > >         Above scenarios could be defined in the process definition,
> > > but process becomes watered down and less readable. I'm almost sure
> > > that it is should be the responsibility (at least in my particular
> > > case) of the participant, not the process definition.
>
> > Those retry techniques belong to the participant IMHO.
>
> Sorry, I'm not sure what you meant here. Described above technique of
> reminders? Usually we have 3000-6000 concurrent processes. Most of
> them waits on parallel approval step contained human activities and
> before/after processing (3-10 concurrent branches). So we will have
> 9000-60000 worktems in the storage. The nature of before\after
> processing doesn't requre it to subclass of the storage participant -
> it just do something with the information in the CMS, but rarely we
> can receive "resource is locked" in that processing steps (as I
> described in preface). To handle it in shown manner we will need to
> subclass StorageParticipant for every processing step. To retry every
> 1 minute it will read and process all of the workitems in the storage
> every minute, am I correct?
>

I tried also to use :on_error attribute to achieve desired
functionality. I raised some specialised exception from "consume". But
it looks like :on_error doesn't work for participant expression, only
for flow control expressions. Is it by design?
So this doesn't work as shown, only if I move :on_error into
"sequence" definition:
---
sequence do
external_work :on_error => 'handle_redo_error'
end
---
My idea was to do something like this:
----
pdef = Ruote.process_definition :name => 'test' do
sequence do
...
# participant implementation raises RedoException<Exception if
it need to be reapplied after a while.
# it should also set "f:redo_wait_interval", but I have no idea,
how to pass it to the engine without reply_to_engine
external_work :on_error => 'handle_redo_error'
...
end

define 'handle_redo_eror' do
wait "f:redo_wait_interval" #cant set this field in the
participant above
_redo # or some specialized participant which knows where to get
back for re-applying, for example if it knows fei, it can easily
utilize engine.re_apply method.
end
end
----
As you can see, here I have two problems:
1. Don't know how to pass desired wait interval to "wait" from
"participant". May be process variables could be utilized for this
goal?
2. Don'k know how to get information about intercepted exception in
the error handler subprocess\participant. Is it possible?

John Mettraux

unread,
Apr 19, 2010, 7:07:14 PM4/19/10
to openwfe...@googlegroups.com
On Tue, Apr 20, 2010 at 1:23 AM, Olle <foe...@gmail.com> wrote:
>
> Workflow is also part of that system, but I hope it could be replaced
> with ruote. Currently we've integrated almost everything we need for
> write and run some "typical" approval process using ruote in
> conjunction with that system. We already ran some simplified tests and
> ruote works excellent ! At the moment we need to finalize our tests and
> run them massively to elaborate the final decision.

Hi Olle,

looking forward to the result of your tests. If you don't use ruote,
please take the time to tell me what went wrong with it so that I can
make it better (if possible).


>> In those cases I would have gone for an extension of the
>> StorageParticipant class. It could look like this :
>>
>> ---8<---
>> class SemiExternalParticipant < Ruote::StorageParticipant
>>   def consume (workitem)
>>     notify_external_participant(workitem.fei)
>>     super
>>   end
>> end
>> --->8---
>>
>> Then for the "reply", a PUT on /workitems/20100419-babaraca!!0_0 could
>> do the trick. And the workitem is waiting for you in the storage
>> participant, no need to call engine.process(wfid).
>>
>> But you're the final judge. I don't see anything wrong with your approach.
>>
>
> Hmmm, I didn't think about extending StorageParticipant. That a good
> remark, thank you! What do you mean saying "no need to call
> engine.process(wfid)", is at a "heavy" for the engine?

It's not that heavy, but heavier than fetching the right workitem.
Though, if you know the fei, you could directly write

wi = Ruote::FlowExpression.fetch(engine.context, fei).h.applied_workitem

It's also a direct hit.

(I could wrap that in a method for advanced users...)

>> But I understand the pain.
>>
>> I will think about your on_reply idea. It makes lots of sense, but I'd
>> like to sleep on it for a while.
>>
>>  http://github.com/jmettraux/ruote/blob/ruote2.1/TODO.txt#L345-346
>>
>> There is one thing. Since we now favour participants instantiated for
>> each dispatch over participants instantiated at register time, it
>> may/will not be the the 'same' participant doing the on_reply as the
>> one that did the dispatch.
>
> I'm not sure what is the problem here. Participant is stateless,
> needed "state" information may be saved in the workitem. User just
> provide a classname with some options, and it will have desired
> behavior in any worker aquainted with this class. I planned to
> implement some basic participant and subcluss it for "type" of tasks.
> It will parse and transfrom results as needed. Usually we have 10-20
> different type of tasks per workflow instance, used for all the
> processes.

OK. If the participant is in charge of handing the workitem to the
"external participant" and fetching back the answer, all from its
consume method, then you have an on_reply, already. Easy.

If the workitem comes back to the engine via a listener, should I
re-instantiate the participant and trigger its on_reply ?

These are the things I have to think about. I like your idea, but I'm
a bit busy right now, I hope that by the end of the week, the ideas
will have gotten clearer and the best implementation will appear.
Sorry for my slowness.


>> >         b.     Participant works with external resource. If resource is
>> > locked, participant should try every 10 minutes for 2 hours and raise
>> > error if the resource still locked.
>> >         Above scenarios could be defined in the process definition,
>> > but process becomes watered down and less readable. I'm almost sure
>> > that it is should be the responsibility (at least in my particular
>> > case) of the participant, not the process definition.
>>
>> Those retry techniques belong to the participant IMHO.
>
> Sorry, I'm not sure what you meant here. Described above technique of
> reminders? Usually we have 3000-6000 concurrent processes. Most of
> them waits on parallel approval step contained human activities and
> before/after processing (3-10 concurrent branches). So we will have
> 9000-60000 worktems in the storage. The nature of before\after
> processing doesn't requre it to subclass of the storage participant -
> it just do something with the information in the CMS, but rarely we
> can receive "resource is locked" in that processing steps (as I
> described in preface). To handle it in shown manner we will need to
> subclass StorageParticipant for every processing step. To retry every
> 1 minute it will read and process all of the workitems in the storage
> every minute, am I correct?

You are right. The cron technique I described is indeed not appropriate.

(now heading to your next email)


--
John Mettraux - http://jmettraux.wordpress.com

Olle

unread,
Apr 20, 2010, 5:09:25 AM4/20/10
to ruote
Hi John,

>
> >> There is one thing. Since we now favour participants instantiated for
> >> each dispatch over participants instantiated at register time, it
> >> may/will not be the the 'same' participant doing the on_reply as the
> >> one that did the dispatch.
>
> > I'm not sure what is the problem here. Participant is stateless,
> > needed "state" information may be saved in the workitem. User just
> > provide a classname with some options, and it will have desired
> > behavior in any worker aquainted with this class. I planned to
> > implement some basic participant and subcluss it for "type" of tasks.
> > It will parse and transfrom results as needed. Usually we have 10-20
> > different type of tasks per workflow instance, used for all the
> > processes.
>
> OK. If the participant is in charge of handing the workitem to the
> "external participant" and fetching back the answer, all from its
> consume method, then you have an on_reply, already. Easy.
>
> If the workitem comes back to the engine via a listener, should I
> re-instantiate the participant and trigger its on_reply ?


Yes. It's exactly my case. There is an external service, which
receives task results from CMS and puts it into ruote (it does POST
_ruote/engine/reply?merge_workitem currently).

participant.on_reply method needs to receive workitem and be able to
change it. If such approach affects performance, engine/worker could
check {participant.responde_to? "on reply"} during first instantiating
and flag participant expression, so there would be no need to re-
instantiate participant if it doesn't provide "on_reply" handler.

Participant class knows how to "tune" and post task to the user and
how to parse and transform received results in context of process.
Listener/receiver is an off-sided intermediate component. It knows
nothing about the process logic, but knows how to get "raw" results
from CMS and pass it to ruote. That was my idea.

Best regards,
Oleg

John Mettraux

unread,
Apr 20, 2010, 5:15:03 AM4/20/10
to openwfe...@googlegroups.com
On Tue, Apr 20, 2010 at 6:09 PM, Olle <foe...@gmail.com> wrote:
>>
>> OK. If the participant is in charge of handing the workitem to the
>> "external participant" and fetching back the answer, all from its
>> consume method, then you have an on_reply, already. Easy.
>>
>> If the workitem comes back to the engine via a listener, should I
>> re-instantiate the participant and trigger its on_reply ?
>
> Yes. It's exactly my case. There is an external service, which
> receives task results from CMS and puts it into ruote (it does POST
> _ruote/engine/reply?merge_workitem currently).
>
> participant.on_reply method needs to receive workitem and be able to
> change it. If such approach affects performance, engine/worker could
> check {participant.responde_to? "on reply"} during first instantiating
> and flag participant expression, so there would be no need to re-
> instantiate participant if it doesn't provide "on_reply" handler.
>
> Participant class knows how to "tune" and post task to the user and
> how to parse and transform received results in context of process.
> Listener/receiver is an off-sided intermediate component. It knows
> nothing about the process logic, but knows how to get "raw" results
> from CMS and pass it to ruote. That was my idea.

Hi Oleg,

Makes sense.

--
John Mettraux - http://jmettraux.wordpress.com

John Mettraux

unread,
Apr 20, 2010, 5:19:28 AM4/20/10
to openwfe...@googlegroups.com
On Mon, Apr 19, 2010 at 02:25:41PM -0700, Olle wrote:
>
> > > I will think about your on_reply idea. It makes lots of sense, but I'd
> > > like to sleep on it for a while.
> >
> > > http://github.com/jmettraux/ruote/blob/ruote2.1/TODO.txt#L345-346
> >
> > > There is one thing. Since we now favour participants instantiated for
> > > each dispatch over participants instantiated at register time, it
> > > may/will not be the the 'same' participant doing the on_reply as the
> > > one that did the dispatch.
> >
> > I'm not sure what is the problem here. Participant is stateless,
> > needed "state" information may be saved in the workitem. User just
> > provide a classname with some options, and it will have desired
> > behavior in any worker aquainted with this class. I planned to
> > implement some basic participant and subcluss it for "type" of tasks.
> > It will parse and transfrom results as needed. Usually we have 10-20
> > different type of tasks per workflow instance, used for all the
> > processes.
>
> This also requires a possibility to return modified workitem to the
> engine without reply_to_engine in the "consume" method. AFAIK, there
> is no way to do this currently, correct?

Hello Oleg,

nothing prevents you from extracting the functionality of reply_to_engine out of local participant.

Ruote has the concept of 'listener'. Some participants may be 'emit only', their consume method never replies to the engine. That calls for listeners.

For example, you could have an AmqpParticipant coupled to an AmqpListener.

I haven't brought back yet the listener concept from ruote 0.9 to ruote 2.1, but, as a first step, I have created a core Listener module, similar to LocalParticipant here :

http://github.com/jmettraux/ruote/commit/5c0789af2fb13634e75b90803e50717489119da6

> > > > b. Participant works with external resource. If resource is
> > > > locked, participant should try every 10 minutes for 2 hours and raise
> > > > error if the resource still locked.
> > > > Above scenarios could be defined in the process definition,
> > > > but process becomes watered down and less readable. I'm almost sure
> > > > that it is should be the responsibility (at least in my particular
> > > > case) of the participant, not the process definition.
> >
> > > Those retry techniques belong to the participant IMHO.
> >
> > Sorry, I'm not sure what you meant here. Described above technique of
> > reminders? Usually we have 3000-6000 concurrent processes. Most of
> > them waits on parallel approval step contained human activities and
> > before/after processing (3-10 concurrent branches). So we will have
> > 9000-60000 worktems in the storage. The nature of before\after
> > processing doesn't requre it to subclass of the storage participant -
> > it just do something with the information in the CMS, but rarely we
> > can receive "resource is locked" in that processing steps (as I
> > described in preface). To handle it in shown manner we will need to
> > subclass StorageParticipant for every processing step. To retry every
> > 1 minute it will read and process all of the workitems in the storage
> > every minute, am I correct?
> >
>
> I tried also to use :on_error attribute to achieve desired
> functionality. I raised some specialised exception from "consume". But
> it looks like :on_error doesn't work for participant expression, only
> for flow control expressions. Is it by design?

Sorry, it's not by design. Thanks for reporting, I fixed it at :

http://github.com/jmettraux/ruote/commit/d0f47c365e4a0a85a33fdbeaee4d1cfef27f3b2e

> So this doesn't work as shown, only if I move :on_error into
> "sequence" definition:
>
> My idea was to do something like this:
>
> ----
> pdef = Ruote.process_definition :name => 'test' do
> sequence do
> ...
> # participant implementation raises RedoException<Exception if
> # it need to be reapplied after a while.
> # it should also set "f:redo_wait_interval", but I have no idea,
> # how to pass it to the engine without reply_to_engine
> external_work :on_error => 'handle_redo_error'
> ...
> end
>
> define 'handle_redo_eror' do
> wait "f:redo_wait_interval"
> # cant set this field in the participant above
> _redo
> # or some specialized participant which knows where to get
> # back for re-applying, for example if it knows fei, it can easily
> # utilize engine.re_apply method.
> end
> end
> ----
>
> As you can see, here I have two problems:
>
> 1. Don't know how to pass desired wait interval to "wait" from
> "participant". May be process variables could be utilized for this
> goal?
> 2. Don'k know how to get information about intercepted exception in
> the error handler subprocess\participant. Is it possible?

It was not really possible, but I have added that for you (for all the ruote users) :

http://github.com/jmettraux/ruote/commit/00588816dc8d0221e1f1f7ca58202908a621e099


I hope to release an official ruote 2.1.10 soon.


Thanks again !

--
John Mettraux - http://jmettraux.wordpress.com

Olle

unread,
Apr 20, 2010, 7:29:47 AM4/20/10
to ruote
John,


> >  1. Don't know how to pass desired wait interval to "wait" from
> > "participant". May be process variables could be utilized for this
> > goal?
> >  2. Don'k know how to get information about intercepted exception in
> > the error handler subprocess\participant. Is it possible?
>
> It was not really possible, but I have added that for you (for all the ruote users) :
>
>  http://github.com/jmettraux/ruote/commit/00588816dc8d0221e1f1f7ca5820...
>

John, thank you! This potentially solves my request.
I've build a quick test (http://gist.github.com/372307):
---
pdef = Ruote.process_definition :name => 'test' do

sequence do
external_work :tag => 'tag1', :on_error => 'handle_error'
echo 'exiting...'
end

define 'handle_error' do
trace
echo 'sleeping for ${f:sleep_for}..'
wait "${f:sleep_for}"
echo 'redoing...'
_redo :ref => 'tag1'
end

end
--
This process should re-apply desired expression after a sleeping. But
it didn't. Here is my output:
--
External work started...
#<Ruote::Workitem:0x99114d8 @h={"fields"=>{"counter"=>1,
"sleep_for"=>"2s", "__error__"=>[{"engine_id"=>"engine",
"wfid"=>"20100420-bejutatzumu", "sub_wfid"=>nil, "expid"=>"0_1_0"},
"2010-04-20 11:21:22.605815 UTC", "RedoException", "RedoException"],
"params"=>{"ref"=>"trace"}, "dispatched_at"=>"2010-04-20
11:21:22.649120 UTC"}, "fei"=>{"engine_id"=>"engine",
"wfid"=>"20100420-bejutatzumu", "sub_wfid"=>"037942411000",
"expid"=>"0_0_0"}, "participant_name"=>"trace"}>
sleeping for 2s..
redoing...
exiting...
--
What could be wrong here?


Regarding to other things - let me to get back later, due to some load
on work.

Best regards,
Oleg

John Mettraux

unread,
Apr 20, 2010, 9:51:26 AM4/20/10
to openwfe...@googlegroups.com
> What could be wrong here?

Hello Oleg,

it can't redo since the tag 'tag1' no longer exists when the error gets handled.

Placing the :tag => 'tag1' on the sequence should do it.

.

But perhaps this is cleaner (and 'handle' is re-usable) :

---8<---
pdef = Ruote.process_definition do
sequence do
handle { external_work }
end
define 'handle' do
repeat do
apply :on_error => 'undo'
_break :unless => '${f:time_to_retry}'
sleep '${f:time_to_retry}'
end
end
end
--->8---

:on_error => 'undo' vaporizes the error and lets the flow continue, this example assumes you've set the field 'time_to_retry' right before raising in the external_work participant (since it's now OK).

Note : apply is a bit like a ruby 'yield'.

.

Another alternative that we discussed : placing all the retry logic inside of the participant and hide it from the 'business process'. Still worth a try. Maybe I could provide you with a way for the local participant to re_apply itself after a while :

---8<---
class OlegParticipant

include Ruote::LocalParticipant

def consume (workitem)
# ...
if something_went_wrong
re_apply(workitem, :in => '3m')
else
reply_to_engine(workitem)
end
end

def cancel (flavour)
# have to unschedule the queued re_applies if any...
end
end
--->8---

Let me think about it...


I will look at the on_reply very soon.


Best regards,

--
John Mettraux - http://jmettraux.wordpress.com

John Mettraux

unread,
Apr 20, 2010, 10:38:21 AM4/20/10
to openwfe...@googlegroups.com
On Tue, Apr 20, 2010 at 10:51 PM, John Mettraux <jmet...@openwfe.org> wrote:
>
> Another alternative that we discussed : placing all the retry logic inside of the participant and hide it from the 'business process'. Still worth a try. Maybe I could provide you with a way for the local participant to re_apply itself after a while :
>
> ---8<---
>  class OlegParticipant
>
>    include Ruote::LocalParticipant
>
>    def consume (workitem)
>      # ...
>      if something_went_wrong
>        re_apply(workitem, :in => '3m')
>      else
>        reply_to_engine(workitem)
>      end
>    end
>
>    def cancel (flavour)
>      # have to unschedule the queued re_applies if any...
>    end
>  end
> --->8---
>
> Let me think about it...

Phase 1 is in :

http://github.com/jmettraux/ruote/commit/91fac72da25af53cb6fbf6d0d9b77300681f413a

I will enable re_apply(workitem, :in => 'x') and re_apply(workitem,
:at => 'y') tomorrow.

Please let me know what you think.

John Mettraux

unread,
Apr 21, 2010, 2:11:16 AM4/21/10
to openwfe...@googlegroups.com

On Tue, Apr 20, 2010 at 11:38 PM, John Mettraux <jmet...@openwfe.org> wrote:
>
> Phase 1 is in :
>
>  http://github.com/jmettraux/ruote/commit/91fac72da25af53cb6fbf6d0d9b77300681f413a
>
> I will enable re_apply(workitem, :in => 'x') and re_apply(workitem,
> :at => 'y') tomorrow.

Done.

Here's what you can do with ruote "trunk" :

---8<---
class RetryParticipant
include Ruote::LocalParticipant

def initialize (opts)
@opts = opts
end

def consume (workitem)
begin
do_the_job
reply(workitem)
rescue
re_dispatch(workitem, :in => @opts['delay'] || '1s')
end
end

def cancel (fei, flavour)
unschedule_re_dispatch(fei)
end
end
--->8---

Olle

unread,
Apr 21, 2010, 3:48:15 AM4/21/10
to ruote
John,

I'm very impressed! This issue was my most valuable concern, it's gone
now. Also it's a strong confirmation of ruote's flexibility and
excellent design. I very much appeciate your help and advices, thank
you! I learned a lot about ruote last couple of days and you solved
all my questions.

Regarding results of our tests. Sorry, forgot to answer. Of course
I'll present its here. I really hope we will choose ruote and be able
to help the project in any possible way.


Best regards,
Oleg

Olle

unread,
Apr 21, 2010, 7:25:41 AM4/21/10
to ruote
John, after all my attempts on ruote I received a lot of new
information and have a bit mess in mind, so may be I missed
something.

My external participant need to store some result of external call
without replying to the engine and utilize stored information in the
"cancel".

Below is a simple example, which uses some questionable approach for
that. It works, but I'm not sure - it feels not good for me. Tried on
the ruote's "trunk".

------ Participant
def consume( workitem )
#some external work returns an "id"
id = some_external_call()

#now need to store received id for futher use in "cancel"
without replying. Reply will be done by external service.
exp = fetch_flow_expression( workitem )
exp.h.applied_workitem[ 'received_id'] = id
exp.persist_or_raise
end

def cancel(fei, flavour)

wi = Ruote::Exp::FlowExpression.fetch(@context,
fei.to_h).h.applied_workitem
cancel_prev_external_call( wi['received_id'] )
end

--------------------

John Mettraux

unread,
Apr 21, 2010, 9:14:34 AM4/21/10
to openwfe...@googlegroups.com

On Wed, Apr 21, 2010 at 04:25:41AM -0700, Olle wrote:
>
> Below is a simple example, which uses some questionable approach for
> that. It works, but I'm not sure - it feels not good for me. Tried on
> the ruote's "trunk".
>
> ------ Participant
> def consume( workitem )
> #some external work returns an "id"
> id = some_external_call()
>
> #now need to store received id for futher use in "cancel"
> without replying. Reply will be done by external service.
> exp = fetch_flow_expression( workitem )
> exp.h.applied_workitem[ 'received_id'] = id
> exp.persist_or_raise
> end
>
> def cancel(fei, flavour)
>
> wi = Ruote::Exp::FlowExpression.fetch(@context,
> fei.to_h).h.applied_workitem
> cancel_prev_external_call( wi['received_id'] )
> end
> --------------------

Hello Oleg,

may I suggest you store the mapping fei <-> 'external id' on the external side ?

---8<---
def consume (workitem)
some_external_call('consume', workitem.fei.to_storage_id)
end

def cancel (fei, flavour)
some_external_call('cancel', fei.to_storage_id)
end
--->8---

If not possible, let me add some method helpers to LocalParticipant in the expression, exactly like you did, but in an "blessed" way.

What do you think ?

--
John Mettraux - http://jmettraux.wordpress.com

Olle

unread,
Apr 21, 2010, 9:34:38 AM4/21/10
to ruote

>
> may I suggest you store the mapping fei <-> 'external id' on the external side ?
>
> ---8<---
>   def consume (workitem)
>     some_external_call('consume', workitem.fei.to_storage_id)
>   end
>
>   def cancel (fei, flavour)
>     some_external_call('cancel', fei.to_storage_id)
>   end
> --->8---

Possible, but supposed to have very slow performance due to our
"external side" (CMS as I described above) design constraints. It's
very slow for searching by side-info.

> If not possible, let me add some method helpers to LocalParticipant in the expression, exactly like you did, but in an "blessed" way.
>

Best choice for me. But here you are the final judge. If you decided
to don't do it, I'll use the first approach.

Best regards,
Oleg

John Mettraux

unread,
Apr 21, 2010, 10:55:39 AM4/21/10
to openwfe...@googlegroups.com

On Wed, Apr 21, 2010 at 06:34:38AM -0700, Olle wrote:
>
> > If not possible, let me add some method helpers to LocalParticipant in the expression, exactly like you did, but in an "blessed" way.
> >
>
> Best choice for me. But here you are the final judge. If you decided
> to don't do it, I'll use the first approach.

Hello Oleg,

it's in :

http://github.com/jmettraux/ruote/commit/366f958de05e5d4b109acac680d200f765b06777

Here is the test case, I hope it shows how it works :

---8<---
BLACKBOARD = {}

class StashingParticipant
include Ruote::LocalParticipant
def initialize (opts)
end
def consume (workitem)
put(workitem.fei, 'token' => workitem.params['token'])
end
def cancel (fei, flavour)
BLACKBOARD['token'] = get(fei, 'token')
end
end

def test_stash

BLACKBOARD.clear

pdef = Ruote.process_definition do
alpha :token => 'of esteem'
end

@engine.register_participant :alpha, StashingParticipant

#noisy

wfid = @engine.launch(pdef)
wait_for(:alpha)

ps = @engine.process(wfid)
fexp = ps.expressions.find { |e| e.fei.expid == '0_0' }

assert_equal({ 'token' => 'of esteem' }, fexp.h.stash)

@engine.cancel_process(wfid)
wait_for(wfid)

assert_equal 'of esteem', BLACKBOARD['token']
end
--->8---

The blackboard thing is only here for me to test.

You can see the communication between the consume and the cancel phases.

As of now 'put' can only get called once, since it can set multiples keys at once, it's OK. Maybe I will fix that later.

I hope it helps.


Best regards, thanks for all,

--
John Mettraux - http://jmettraux.wordpress.com

John Mettraux

unread,
Apr 26, 2010, 1:51:24 AM4/26/10
to openwfe...@googlegroups.com

On Tue, Apr 20, 2010 at 10:51 PM, John Mettraux <jmet...@openwfe.org> wrote:
>
> I will look at the on_reply very soon.

Hello Oleg,

the on_reply is in :

http://github.com/jmettraux/ruote/commit/4d02d5eaf6a9fa231978ad59c64cbde4d4d33470

The tests cover vanilla on_reply and failing on_reply.

It's indeed nice to gather consume and on_reply logic under the same roof.


Many thanks,
Reply all
Reply to author
Forward
0 new messages