Suggestions for waiting and resuming with concurrency

8 views
Skip to first unread message

I. E. Smith-Heisters

unread,
Jul 19, 2010, 6:19:23 PM7/19/10
to ruote
Hi all,

I posted some code a while back that lets me pause my main thread
while the Ruote worker thread does work. The Ruote worker wakes the
main thread up when it's done. The point being that in certain very
important places I can simulate synchronicity with Ruote so human
users can be assured that they'll see the results of their (HTTP)
requests.

The code I posted (below) relied on subclassing StorageParticipant and
overloading the #consume method. Now that we've added concurrency to
our process definition this approach has an unforeseen (but now
obvious) shortcoming. When we're in a concurrent branch, the
completion of one branch does not send the workitem to another
participant (since we're waiting on the other branch), and #consume is
never called.

I have a couple possible fixes up my sleeve: overriding
Ruote::Exp::ParticipantExpression#reply to continue the waiting thread
(hacky), or not waiting in the first place if we're in a concurrent
branch (inconsistent behavior). However, I was wondering if someone
might have some suggestions that are less error-prone and better
supported by the existing interfaces (ie. no monkey-patching).

Thanks,
Ian

--
I've edited this some to omit extraneous details, so it's untested as
shown:


module Participants
class Patient < Ruote::StorageParticipant
class NestedWaitForError < StandardError; end
cattr_accessor :waiting
class << self
# run in the server thread
# => timeout worker response after a few seconds so users don't
see
# hanging when an error happens
def wait_for signal=:next, &block
raise NestedWaitForError, "nesting wait_for is not currently
supported" if self.waiting
begin
self.waiting = [Thread.current, signal]
caller_result = yield
if waiting
begin
Timeout::timeout(timeout){ Thread.stop }
rescue Timeout::Error # worker thread didn't call +continue
+ before the timeout expired
Rails.logger.info 'Timeout while waiting for worker'
Thread.current['__patience_result__'] = :timeout
end
end
Thread.current['__patience_result__']

ensure
self.waiting =
nil

end

end

# run in the worker
thread
def continue
workitem
return unless
waiting
thread, waiting_for =
*waiting
signals = [:next,
workitem.participant_name]
if signals.include?
waiting_for
continue!
workitem

end

end

def continue!
result
return unless
waiting
thread =
waiting.first
thread['__patience_result__'] =
result
self.waiting =
nil

thread.wakeup

end

# seconds to wait for worker to consume the
workitem
def
timeout

10

end

end

def consume workitem
super workitem
self.class.continue workitem.dup
end
end
end

John Mettraux

unread,
Jul 19, 2010, 8:59:59 PM7/19/10
to openwfe...@googlegroups.com

On Mon, Jul 19, 2010 at 03:19:23PM -0700, I. E. Smith-Heisters wrote:
>
> I posted some code a while back that lets me pause my main thread
> while the Ruote worker thread does work. The Ruote worker wakes the
> main thread up when it's done. The point being that in certain very
> important places I can simulate synchronicity with Ruote so human
> users can be assured that they'll see the results of their (HTTP)
> requests.
>
> The code I posted (below) relied on subclassing StorageParticipant and
> overloading the #consume method. Now that we've added concurrency to
> our process definition this approach has an unforeseen (but now
> obvious) shortcoming. When we're in a concurrent branch, the
> completion of one branch does not send the workitem to another
> participant (since we're waiting on the other branch), and #consume is
> never called.
>
> I have a couple possible fixes up my sleeve: overriding
> Ruote::Exp::ParticipantExpression#reply to continue the waiting thread
> (hacky), or not waiting in the first place if we're in a concurrent
> branch (inconsistent behavior). However, I was wondering if someone
> might have some suggestions that are less error-prone and better
> supported by the existing interfaces (ie. no monkey-patching).

Hello,

maybe you could have a look at Participant#on_reply. It was recently enhanced to work with "instantiated participants" (participants instantiated at registration time (I have the impression that it's what you use)).

The test case is :

http://github.com/jmettraux/ruote/blob/ruote2.1/test/functional/ft_43_participant_on_reply.rb

Basically, it's a callback triggered when the workitem reaches back the participant.

I'm not sure I fully understand your case. It seems that if there are multiple workitems in your engine, any of them will wake up the waiting thread when reaching a storage participant.

Or, some speculation : what about a participant that explicitely wakes up your thread ?

---8<---
pdef = Ruote.process_definition do
sequence do
participant 'patient'
do_the_work
participant 'waker'
end
end

module Bedroom

@guests = {}

def self.sleep (label)
@guests[label] = Thread.current
Thread.current.stop
end

def self.wakeup (label)
t = @guests.delete(label)
t.wakeup
end
end

class PatientParticipant < Ruote::StorageParticipant

def reply (workitem)
label = workitem.fei.to_s
workitem.fields['sleeper'] = label
super(workitem)
Bedroom.sleep(label)
end
end

class WakerParticipant
include Ruote::LocalParticipant

def consume (workitem)
Bedroom.wakeup(workitem.fields.delete('sleeper'))
reply_to_engine(workitem)
end
end
--->8---

(This assumes that the worker is running in the same ruby process as the web application).

When the patient participant replies to the engine, the replying thread is put to sleep. It's woken up a bit later by a specific 'waker' participant.

You could also rely on something inside "do_the_work" to wake up the thread.

Sorry if I'm completely off.


Best regards,

--
John Mettraux - http://jmettraux.wordpress.com

I. E. Smith-Heisters

unread,
Jul 19, 2010, 9:06:59 PM7/19/10
to ruote
Did some more looking into this, and found more dead-ends. I managed
to overload StorageParticipant#reply which fixes some limited cases
when the workitem being replied with is at the end of a concurrence
branch. However, it doesn't work in situations where there are
branches with more than one step, eg.

sequence do
concurrence do
alpha
cursor do
bravo
charlie
end
end
delta
end

The behavior is that when you reply to bravo, execution continues but
charlie hasn't been dispatched. So, you can only hack #reply if you
can determine whether the current participant is at the end of a
branch of execution. Lots of playing with FlowExpression#parent and
#children didn't yield anything useful there.

Other possibilities: wakeup the sleeping thread periodically and see
if the worker thread needs more time (not sure how to do that, seems
inefficient), maybe something with loggers?

Considering sending up the white flag and just letting the sleep
timeout in these cases...

-ISH

John Mettraux

unread,
Jul 19, 2010, 9:42:25 PM7/19/10
to openwfe...@googlegroups.com

On Mon, Jul 19, 2010 at 06:06:59PM -0700, I. E. Smith-Heisters wrote:
>
> Did some more looking into this, and found more dead-ends. I managed
> to overload StorageParticipant#reply which fixes some limited cases
> when the workitem being replied with is at the end of a concurrence
> branch. However, it doesn't work in situations where there are
> branches with more than one step, eg.
>
> sequence do
> concurrence do
> alpha
> cursor do
> bravo
> charlie
> end
> end
> delta
> end

Hello Ian,

you want the thread to wakeup only when it's been put to sleep at bravo and it then reaches charlie ?

What is the rule for threads put to sleep at alpha and delta ?

I. E. Smith-Heisters

unread,
Jul 20, 2010, 1:00:46 AM7/20/10
to ruote
On Jul 19, 5:59 pm, John Mettraux <jmettr...@openwfe.org> wrote:
> On Mon, Jul 19, 2010 at 03:19:23PM -0700, I. E. Smith-Heisters wrote:
>
> > I posted some code a while back that lets me pause my main thread
> > while the Ruote worker thread does work. The Ruote worker wakes the
> > main thread up when it's done. The point being that in certain very
> > important places I can simulate synchronicity with Ruote so human
> > users can be assured that they'll see the results of their (HTTP)
> > requests.
>
> > The code I posted (below) relied on subclassing StorageParticipant and
> > overloading the #consume method. Now that we've added concurrency to
> > our process definition this approach has an unforeseen (but now
> > obvious) shortcoming. When we're in a concurrent branch, the
> > completion of one branch does not send the workitem to another
> > participant (since we're waiting on the other branch), and #consume is
> > never called.
>
> > I have a couple possible fixes up my sleeve: overriding
> > Ruote::Exp::ParticipantExpression#reply to continue the waiting thread
> > (hacky), or not waiting in the first place if we're in a concurrent
> > branch (inconsistent behavior). However, I was wondering if someone
> > might have some suggestions that are less error-prone and better
> > supported by the existing interfaces (ie. no monkey-patching).
>
> Hello,
>
> maybe you could have a look at Participant#on_reply. It was recently enhanced to work with "instantiated participants" (participants instantiated at registration time (I have the impression that it's what you use)).

Nope. I use `register_participant 'foo', Foo`.

>
> The test case is :
>
>  http://github.com/jmettraux/ruote/blob/ruote2.1/test/functional/ft_43...
>
> Basically, it's a callback triggered when the workitem reaches back the participant.

on_reply looks functionally identical to what I did by overloading
StorageParticipant#reply in a subclass. It's nicer, so I can switch to
that, but it still won't solve my problem (see below).

>
> I'm not sure I fully understand your case. It seems that if there are multiple workitems in your engine, any of them will wake up the waiting thread when reaching a storage participant.

*All* my code is currently run in response to user requests, and all
my participants are storage participants. There's one user request per
ruby process, so there's no chance of the waiting thread being woken
by a workitem from another ruote process.

>
> Or, some speculation : what about a participant that explicitely wakes up your thread ?

hmmmm... this seems promising.
Not at all, this is exactly the completely new perspective I need.
Sorry I was unclear--I was too in the middle of the code to explain it
well. Let me try again.

Let's take this example pdef, where all participants are storage
participants:

sequence do
concurrence do
alpha
cursor do
bravo
charlie
end
end
delta
end

Now, simplified my user-request code might look like this:

# user POST /<wfid>/alpha
def process_user_request wfid, pname
wi = Workitems.by_wfid_and_pname(wfid, pname) # I've ensured that
there can't be two workitems for 'alpha' on the same wfid
Participants::Patient.wait_for do
Workitems.reply wi
end
end

The ::wait_for code, as seen in my first post, basically puts the
current thread to sleep, and causes calls to
StorageParticipant#consume to wake it up. So, the following test
works:

Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'bravo']
process_user_request '<wfid>', 'bravo'
Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'charlie']

So, the wait_for ensures not only that the #reply has been processed,
but that the next participant has been reached (ie. #consume has been
called).

Now, take the failing test:

Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'bravo']
process_user_request '<wfid>', 'alpha' # difference: replying to the
alpha participant
Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['bravo'] # FAIL!

This fails--the ruby process hangs on the call to
#process_user_request. This is because it's waiting for #consume, but
bravo's #consume has already been called!

Ok, we can add #on_reply to the mix:

class PatientParticipant
def on_reply wi
self.class.continue if
fetch_flow_expression(wi).parent.class.to_s =~ /Concurrence/i
end
end

Ok, so this makes the test above pass even though it's obviously dumb.
It generally relies on #consume, but in the case of #alpha, will
continue the sleeping thread. So even this longer test works:

Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'bravo']
process_user_request '<wfid>', 'alpha'
Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['bravo']
process_user_request '<wfid>', 'bravo'
Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['charlie']
process_user_request '<wfid>', 'charlie'
Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['delta']

BUUUT, if you go the other way by replying to bravo first, it fails of
course:

Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'bravo']
process_user_request '<wfid>', 'bravo'
Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'charlie']
process_user_request '<wfid>', 'charlie' # BONK!
Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha']

It stalls forever. delta won't receive a dispatch until alpha is done,
alpha has already received a dispatch and won't continue the thread,
and I have no way to tell that charlie was the last node in a
concurrent branch.

Clearer? TMI?

#on_reply won't work in the general case, when there's sequential
execution, because it will wake the sleeping thread before the next
participant has received their workitem. So, maybe a WakeupParticipant
would work (I like its simplicity)--but it will double the size of all
our pdef code (oh well). Another option that's looking increasingly
attractive is just to poll the worker to see if it's done. The ideal
solution would be to detect when you're at the end of a branch, but
that there are other branches pausing execution. The
`concurrence :count => x` feature, though, could make that even more
difficult...

Sorry for writing the Great American Novel. Thanks for your advice and
help.

-Ian

I. E. Smith-Heisters

unread,
Jul 20, 2010, 3:02:35 PM7/20/10
to ruote
Been playing with a "wakeup" participant to no avail. It suffers from
the same problem as #on_reply in that it resumes the waiting thread
before the next storage participant has received the workitem.

Polling doesn't work because I can't find a good interface for asking
the worker "do you have more work to do?". I had thought
Worker#inactive? would do this, but it will return false as long as
there are wfids in the DB.

-ISH

I. E. Smith-Heisters

unread,
Jul 20, 2010, 6:13:18 PM7/20/10
to ruote
Ok, I finally got something working, but I'm afraid it will be
brittle. I figured that using a service and notification events would
be more flexible, produce code with looser coupling with Ruote, and
provide a path toward better asynchronicity in the future. So, I
removed the wait code from my participants and added it to a static
module. I then created a service called "PatientNotifier" like this:

# Wakes a thread that is stopped using `Patient::wait_for`. It will
wake the
# thread in the following
circumstances:
#
# * when a "dispatch" event is received that has a workitem that
matches the
# criteria passed to
`Patient::wait_for`
# * when a "reply" event is received, and no further events are
received for
# up to a `listen_for`
seconds.
#
# This latter circumstance is potentially problematic if some
messages take
# more than `listen_for` seconds to be processed, but it's the only
way to
# detect situations where a "dispatch" won't be sent until something
else
# happens, eg. when one branch of one or more concurrent branches
has ended.
# This is unavoidable given Ruote's design of never knowing what's
going to
# happen next.
class PatientNotifier <
ApplicationNotifier
class << self
def subscribe_to; %w(reply dispatch);
end
def listen_for; 1;
end

end

def initialize *args
super *args
@lock =
Mutex.new

end

def dispatches; @lock.synchronize{@dispatches}; end
def dispatches= new_d; @lock.synchronize{@dispatches = new_d};
end
def listen_thread; @lock.synchronize{@listen_thread}; end
def listen_thread= new_d; @lock.synchronize{@listen_thread =
new_d}; end

def notify msg
send "handle_#{msg["action"]}",
msg
end

def handle_reply
msg
start_listening
msg
end

def handle_dispatch msg
self.dispatches << msg if dispatches
continue
msg

end

def continue
msg
wi =
Ruote::Workitem.new(msg["workitem"])
::Patient.continue wi
end

def start_listening msg
if listen_thread

listen_thread.kill

end
self.listen_thread = Thread.new do
listen
msg
end
end

def listen msg
self.dispatches =
[]
sleep self.class.listen_for
if self.dispatches.empty?
continue msg
end
ensure
self.listen_thread, self.dispatches = nil, nil
end
end

I then call this on my engine: `add_service 'patient_notifier',
'notifiers', 'Workflow::PatientNotifier'`

The rest of the code is pretty similar to the code I posted before
with some slight refactoring to remove it from Participants and put it
into a singleton Patient module (sans #consume method, of course).

I'm interested to hear your thoughts on this approach.

John Mettraux

unread,
Jul 20, 2010, 7:42:56 PM7/20/10
to openwfe...@googlegroups.com
On Wed, Jul 21, 2010 at 4:02 AM, I. E. Smith-Heisters <i...@0x09.com> wrote:
> Been playing with a "wakeup" participant to no avail. It suffers from
> the same problem as #on_reply in that it resumes the waiting thread
> before the next storage participant has received the workitem.
>
> Polling doesn't work because I can't find a good interface for asking
> the worker "do you have more work to do?". I had thought
> Worker#inactive? would do this, but it will return false as long as
> there are wfids in the DB.

Hello,

you could do something like

def idle?
7.times do
Thread.pass
return false if @context.storage.get_msgs.size > 0
end
true
end

But I'm afraid it's a bit brittle (there could be schedules kicking in
a bit later).

John Mettraux

unread,
Jul 20, 2010, 8:00:12 PM7/20/10
to openwfe...@googlegroups.com

> --------------------------------------------------------------------------------------------- <


>  # This is unavoidable given Ruote's design of never knowing what's
> going to
>  # happen next.

> --------------------------------------------------------------------------------------------- <

Framing that one :-)

"embrace the asynchronous"

Ouch, I have trouble re-formatting that each time. It looks painful.

> I then call this on my engine: `add_service 'patient_notifier',
> 'notifiers', 'Workflow::PatientNotifier'`
>
> The rest of the code is pretty similar to the code I posted before
> with some slight refactoring to remove it from Participants and put it
> into a singleton Patient module (sans #consume method, of course).
>
> I'm interested to hear your thoughts on this approach.

I read at some point that you'd like to know your position in the flow
in order to not sleep. Maybe this could help :

( gist at http://gist.github.com/483796 )

---8<---
require 'rubygems'
require 'ruote'

engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))

pdef = Ruote.define do
concurrence do
alpha
sequence do
bravo
charlie
end
delta
end
end

class MyParticipant
include Ruote::LocalParticipant

def consume (workitem)
p [
workitem.participant_name,
:parent_type, parent_type(workitem),
:last?, last?(workitem)
]
reply(workitem)
end

# Returns the name of the parent expression (like 'sequence' or
# 'concurrence').
#
def parent_type (workitem)
fetch_flow_expression(workitem).parent.name
end

# Returns true if the participant expression with this workitem is the
# last in the list of children of the parent expression
# (warning, it doesn't mean that much when in a concurrence expression).
#
def last? (workitem)
fexp = fetch_flow_expression(workitem)
fexp.fei.child_id == fexp.parent.tree[2].length - 1
end
end

engine.register_participant '.+', MyParticipant

wfid = engine.launch(pdef)

engine.wait_for(wfid)
--->8---

I hope it helps.

I. E. Smith-Heisters

unread,
Jul 22, 2010, 1:21:56 PM7/22/10
to ruote
Sorry Google Groups was munging my code formatting. Here's my
refactored half-solution nicely formatted: http://gist.github.com/486273

I say half-solution because it may need to be iterated on a little,
since it appears to be unstable when the `listen` functionality is
triggered. The `listen` is the only way I could get it to resume at
the end of a concurrent branch.

The idea with this approach is that you can swap out Patient for
something like Comet and push events directly to a user over HTTP.
Having the HTTP server thread do the waiting is just a cheap
substitute for serving the user a page that waits for Ruote events.

-ISH

John Mettraux

unread,
Jul 22, 2010, 9:04:06 PM7/22/10
to openwfe...@googlegroups.com
On Fri, Jul 23, 2010 at 2:21 AM, I. E. Smith-Heisters <i...@0x09.com> wrote:
> Sorry Google Groups was munging my code formatting. Here's my
> refactored half-solution nicely formatted: http://gist.github.com/486273
>
> I say half-solution because it may need to be iterated on a little,
> since it appears to be unstable when the `listen` functionality is
> triggered. The `listen` is the only way I could get it to resume at
> the end of a concurrent branch.
>
> The idea with this approach is that you can swap out Patient for
> something like Comet and push events directly to a user over HTTP.
> Having  the HTTP server thread do the waiting is just a cheap
> substitute for serving the user a page that waits for Ruote events.

Thanks for sharing !

It looks good.

Reply all
Reply to author
Forward
0 new messages