Ruote to orchestrate time-expensive computer processes?

81 views
Skip to first unread message

Nicola

unread,
Feb 28, 2012, 11:55:03 AM2/28/12
to openwfe...@googlegroups.com
Hello,
I am pretty new to Ruote and I would like some suggestions/guidelines about how
to develop an architecture for the scenario I am going to describe. Pointers to
existing code would also be appreciated.

A lab needs to run automatic and computationally expensive data analyses (which
can last days or even weeks); each such task is the execution of one or more
command-line tools, currently launched manually from a shell. Some tasks must
run sequentially and some other may run concurrently. When a task is over,
typically some manual inspection of the output must be carried out to decide
whether the workflow can proceed to the next (time-expensive) analysis.
Sometimes (more often than not), tasks are not completed because an error occurs
(for example, the task hits a memory limit and it is killed by the operating
system), so a human agent must decide what to do (relaunch, cancel, start a
different workflow, etc...). The reason I'd like to use Ruote for this instead
of some job scheduler is that there is a non-linear mix of computer and human
tasks to be performed, and a job scheduler is not flexible enough (in
particular, it does not handle the human part).

The main problem I am facing is: what is the best way to execute the computer
tasks in separate (Ruby) processes, and start/stop them and track their status,
say, from a web interface à la ruote-kit? As far as I can see, Ruote can spawn a
new thread when it hands a workitem to a participant, but not a new (Ruby)
process. Who should be responsible for spawning a separate Ruby process? Should
it be done in the participant? Or in the “main” program? Or should I use some
client-server architecture?

I have a feeling that workers play a role here, but how they... work (ehm) is
not that clear to me yet. I have read both the Ruote-Kit Readme and the blog
post about Ruote 2.1, but what code like this does

storage = Ruote::FsStorage.new('ruote_work')
worker = Ruote::Worker.new(storage)
worker.run
# current thread is now running worker

and whether it is self-contained is still a mystery to me (how does this know
what to look for in the storage? What does it “run”?). When I run it, a script
like the above either gets stuck or it gives an error like “no JSON backend
found” (in the case of ruote-kit). I would be glad if someone could help me make
my mind clear on these issues.


And if I am allowed to abuse your patience a bit more, I have also a couple of
specific, and probably naive, questions:

1) if I define a participant by subclassing Ruote::StorageParticipant, do I
still need to mixin Ruote::LocalParticipant?

(2) In many examples of process definitions, participants are passed a :task
parameter. Which makes me wonder: does that have a special meaning in Ruote? I
have never seen an example of a participant implementation making use of the
:task parameter, so I have always assumed that it is a name like another and
have used it as follows:

class MyParticipant
include Ruote::LocalParticipant

def consume(workitem)
case workitem.params[:task]
when 'do this' then dothis(workitem)
when 'do that' then dothat(workitem)
end
end

private
def dothis(wi)
[...]
end

def dothat(wi)
[...]
end
end

Is this the intended usage pattern?

Nicola

John Mettraux

unread,
Feb 28, 2012, 3:26:47 PM2/28/12
to openwfe...@googlegroups.com

On Tue, Feb 28, 2012 at 05:55:03PM +0100, Nicola wrote:
>
> (...)

>
> The main problem I am facing is: what is the best way to execute the computer
> tasks in separate (Ruby) processes, and start/stop them and track their status,
> say, from a web interface à la ruote-kit? As far as I can see, Ruote can spawn a
> new thread when it hands a workitem to a participant, but not a new (Ruby)
> process. Who should be responsible for spawning a separate Ruby process? Should
> it be done in the participant? Or in the “main” program? Or should I use some
> client-server architecture?

Hello Nicola,

Not everybody needs ruote to fork a new Ruby process for a participant
execution, most of the use cases don't require that.

Somehow, since ruote has a worker architecture, as soon as you have multiple
workers, you are actually executing task in separate Ruby processes.

If I read correctly, you want to run commands via ruote. A very naive
implementation:

---8<---
class SpawnerParticipant
include Ruote::LocalParticipant

def on_workitem
workitem.fields['result'] = `#{workitem.fields['command']}`
reply
end

def on_cancel
# well, can't do much... :-(
end
end
--->8---

A better implementation would probably use something like

https://github.com/ahoward/open4

or

https://github.com/kschiess/procrastinate

store the PIDs somehow (and the mapping between a workitem id and a PID) for
cancel attempts...

You could store the identifier in the command itself so that the cancel code
can skim the "ps" output to retrieve the PID to signal to...


> I have a feeling that workers play a role here, but how they... work (ehm) is
> not that clear to me yet. I have read both the Ruote-Kit Readme and the blog
> post about Ruote 2.1, but what code like this does
>
> storage = Ruote::FsStorage.new('ruote_work')
> worker = Ruote::Worker.new(storage)
> worker.run
> # current thread is now running worker
>
> and whether it is self-contained is still a mystery to me (how does this know
> what to look for in the storage? What does it “run”?). When I run it, a script
> like the above either gets stuck or it gives an error like “no JSON backend
> found” (in the case of ruote-kit). I would be glad if someone could help me make
> my mind clear on these issues.

A detailed issue report would help us help you.

When it gets "stuck", I guess it simply loops for work and since there is
nothing to do, it appears to be stuck.

Doing something like "require 'rufus-json/automatic'" can alleviate the "no
JSON backend found".

.

Generally the code above comes as

# self-contained dashboard + worker + storage

dashboard = Ruote::Dashboard.new(
Ruote::Worker.new(
Ruote::FsStorage.new('ruote_work')))

Useful for testing.

# dashboard only (no ruote process execution takes place here, front-end)

dashboard = Ruote::Dashboard.new(
Ruote::FsStorage.new('ruote_work'))

# worker only (no control, pure execution, back-end)

worker = Ruote::Worker.new(
Ruote::FsStorage.new('ruote_work'))
worker.run

The trick is that, when a Dashboard is passed a worker at initialization, it
will call its #run method, so the last bit can be rewritten as:

dashboard = Ruote::Dashboard.new(
Ruote::Worker.new(
Ruote::FsStorage.new('ruote_work')))

and be equivalent (well, there is visibly one object more, an instance of
Dashboard, but it's not very heavy).

Granted, it may look byzantine, but I've settled on this "encapsulation"
mecha to embody composition of dashboard / worker / storage and variants.
It's byzantine enough that people stop and start thinking about the
implications.

.

Now about the "running" thing. When a worker runs, it polls the storage for
messages that are pieces of ruote process execution. The msg generally
involves fetching a flow expression or instantiating a new one and do
something with it.

When the message is handled, it is discarded, but probably
a new, resulting, message has been pushed to the storage (that new message
will get consumed by the first worker who'll put its hand on it).

Note that in the case of a concurrence application, one message per
concurrent branch is issued.

.

Here is what the #run method looks like:

https://github.com/jmettraux/ruote/blob/147bad157e4a6ef387fe812a8da53a74c530af6a/lib/ruote/worker.rb#L88-94


> And if I am allowed to abuse your patience a bit more, I have also a couple of
> specific, and probably naive, questions:
>
> 1) if I define a participant by subclassing Ruote::StorageParticipant, do I
> still need to mixin Ruote::LocalParticipant?

No,

---8<---
module M
def x
p :x
end
end

class A
include M
def y
p :y
end
end

class B < A
end

b = B.new
b.x # => :x
b.y # => :y
--->8---

> (2) In many examples of process definitions, participants are passed a :task
> parameter. Which makes me wonder: does that have a special meaning in Ruote? I
> have never seen an example of a participant implementation making use of the
> :task parameter, so I have always assumed that it is a name like another and
> have used it as follows:
>
> class MyParticipant
> include Ruote::LocalParticipant
>
> def consume(workitem)
> case workitem.params[:task]
> when 'do this' then dothis(workitem)
> when 'do that' then dothat(workitem)
> end
> end
>
> private
> def dothis(wi)
> [...]
> end
>
> def dothat(wi)
> [...]
> end
> end
>
> Is this the intended usage pattern?

It's a possible usage pattern.

Most of the time, the :task attribute is used as a "title" for the workitem
handed to a human participant. So it's more of a convention.

You are totally free to follow the pattern you described.


Questions are welcome. Best regards,

--
John Mettraux - http://lambda.io/processi

marsbomber

unread,
Feb 28, 2012, 5:37:49 PM2/28/12
to ruote
Hi Nicola,

I'm working on a similar system as you described above. I'm using
RabbitMQ to offload
all computation to discrete and standalone processors.

Architectural wise, I'm doing the following

1, Rails backed RESTful API to allow clients launching jobs
2, Received job gets translated into Ruote specific process
definitions (AST)
3, I run Ruote using a REDIS backed storage, which allows me to attach
multiple workers
4, RuoteAMQP remote participants are heavily used, so that Ruote
worker will pickup an
expression, and dispatch the job to RabbitMQ
5, A number of discrete/standalone processors (implemented using
DaemonKit) subscribe to
designated RabbitMQ queues. They will do their work (could take min/
hours/even longer,
doesn't matter), and return the finished workitem back to Ruote, so
that the process
will continue.
6, Any errors occured will raise exception within Ruote, so that the
whole job will be on_error
and paused
7, I build an admin interface (my human participant) to monitor all
the logged and launched jobs.
It allows me to see which remote participant fails and give me a
chance to fix the participant and
re_apply the errored job

Nicola, hope this helps you in some ways. Unfortunately what I'm
building is close sourced, I cannot disclose
anymore details.

As something for myself, John, does what I describe make sense to you
at all? Am I over-complicating things?
Or there are aspects from Ruote, which I simply missed, that could
simplify things a little? Thanks John!

John Mettraux

unread,
Feb 28, 2012, 11:39:19 PM2/28/12
to openwfe...@googlegroups.com

On Tue, Feb 28, 2012 at 02:37:49PM -0800, marsbomber wrote:
>
> Nicola, hope this helps you in some ways. Unfortunately what I'm
> building is close sourced, I cannot disclose
> anymore details.

Hello Mars,

can you tell us for which domain you developed your solution?

> As something for myself, John, does what I describe make sense to you
> at all? Am I over-complicating things?
> Or there are aspects from Ruote, which I simply missed, that could
> simplify things a little?

This "RabbitMQ between ruote and the hard-working participants" pattern seems
to be used by lots of people.

I sometimes wonder if a simpler queue, like a Redis queue wouldn't be
sufficient, but I guess people are used to deploy and manage RabbitMQ and
they like to have fancy topologies so it makes a lot of sense.

I also wonder if a 0mq setting might not be nice. We could also use 0mq
inside of a ruote storage (might be very fun).

I have to say I'm impressed, we first talked 39 days ago (according to my
Twitter client) and since, you built such a nice system, barely asking any
questions.

BTW, we should discuss if your deep-merge is a good candidate for the master
branch.


Keep up the great work!

Farrel Lifson

unread,
Feb 29, 2012, 2:09:43 AM2/29/12
to openwfe...@googlegroups.com
On Wednesday, 29 February 2012 06:39:19 UTC+2, John Mettraux wrote:
This "RabbitMQ between ruote and the hard-working participants" pattern seems

to be used by lots of people.


I've noticed that as well and I'm wondering whether the proposed implementation I am working on might be a bit misguided....

I have all the heavy lifting done in the participants themselves, with (possibly) multiple Ruote worker daemons (possibly on different machines) all communicating via a shared MongoDB storage acting as the 'queue'. My initial experiments running on my laptop were successful but I'm worried I might be a bit too optimistic as to whether this is the right way to go.

I cant' see any obvious downsides and it cuts out another piece of software to manage (RabbitMQ) but if I'm straying down the wrong path let met know!

Farrel

John Mettraux

unread,
Feb 29, 2012, 3:15:54 AM2/29/12
to openwfe...@googlegroups.com

On Tue, Feb 28, 2012 at 11:09:43PM -0800, Farrel Lifson wrote:
>
> I have all the heavy lifting done in the participants themselves, with
> (possibly) multiple Ruote worker daemons (possibly on different machines)
> all communicating via a shared MongoDB storage acting as the 'queue'. My
> initial experiments running on my laptop were successful but I'm worried I
> might be a bit too optimistic as to whether this is the right way to go.
>
> I cant' see any obvious downsides and it cuts out another piece of software
> to manage (RabbitMQ) but if I'm straying down the wrong path let met know!

Hello Farrel,

it should be OK, that's how I'd do it too.

If we look a bit further, placing a queue between ruote and some of the
participants has an interesting advantage: not only ruote can queue work for
those non-ruote workers. Ruote becomes just a[n orchestration] client among
other clients that can place work orders.

Of course if you don't need that level of refinement, then don't go for it
(but remember that you can switch participant later when you have accumulated
enough experience and metrics).

Taking some time to look at RubyAMQP and co is worth it:

http://rubyamqp.info/

Some of those patterns can be implemented with other queues than AMQP ones.


I hope others will chime in, cheers,

marsbomber

unread,
Feb 29, 2012, 5:45:15 AM2/29/12
to ruote
Hi John,

I'm working on a hosting orchestration application. Ruote is the core
of this "glueing" system. A fundamental business requirement is to
make sure the whole system is resilient. Any and every part of it can
fail at any given time, we need to be able to recover it somehow ...
Also we need to scale out the bottleneck parts easily too ... hence
there goes my setup ... Is it an overkill or not? I honestly don't
know, but you know what objective I'm aiming at now! (I actually am
considering if a federation messaging architecture is needed to truly
achieve system resilient)

Yeah, it's about 39 days of banging my head against the wall :) I
didn't ask many questions, I guess because I was so focused on getting
the architecture out with a proof of concept workflow run. I'm sure I
will bug you more often once I need to dig into the details more!

As for the deep_merge, you could see my hacky commit from my fork of
ruote. It's nothing but change your 2.3 union merge impl from using
ruby merge! to active_support's deep_merge! Take it anytime! I'm
however happy to discuss with you whether it's needed at all at the
first place though!

marsbomber

unread,
Feb 29, 2012, 5:52:16 AM2/29/12
to ruote

> If we look a bit further, placing a queue between ruote and some of the
> participants has an interesting advantage: not only ruote can queue work for
> those non-ruote workers. Ruote becomes just a[n orchestration] client among
> other clients that can place work orders.

Yes, I see that as the true advantage of taking Ruote into an SOA
environment. Ruote dictates the workflow (my understanding is that's
what Ruote is designed for, but John, you'll have a bigger say, coz
you're the author :p), everything true processing unit just worry
about their own discrete task.

I actually get a bit confused about your work on ruote-swf. I'm not
sure where swf fits in the puzzle now ... My guess is if that one
single job step/expression/participant is doing a hell lot and need to
break down the sub task into sub-sub tasks, that's when you need swf?

I realised it's a bit off topic now to what Nicola asked in this
thread. My apology!

Nicola

unread,
Feb 29, 2012, 6:05:21 AM2/29/12
to openwfe...@googlegroups.com
Thanks for your nice and prompt replies! I see that there are several things I
have not looked at yet (like Redis, AMQP, receivers, etc...), so thanks for
pointing me to that direction. After reading your posts, I have realized that I
am confused about the way workers operate beyond the simplest pattern (when they
are passed as an argument to the engine), which is the way I have used them so
far. Browsing through the source code, as suggested, unfortunately didn't help
me much. To make things very “practical” (and at a very basic level - please
bear with me), consider this pair of scripts:

# Script 1
engine = Ruote::Engine.new(Ruote::FsStorage.new('ruote_test'))

engine.register_participant '.+' do |workitem|
puts workitem.params['msg']
end

pdef = Ruote.process_definition do
sequence do
participant :alpha, :msg => 'Running alpha'
participant :beta, :msg => 'Running beta'
end
end

wfid = engine.launch(pdef)

-----------------

# Script 2
storage = Ruote::FsStorage.new('ruote_test')


worker = Ruote::Worker.new(storage)
worker.run

Ideally, I would like script 1 to launch a process instance and exit, and script
2 to “pick up” that instance from the storage and execute it (by running the two
scripts one after another and on the same machine). Essentially, I would like
script 2 to output:

Running alpha
Running beta

Script 1 seems to serve the purpose (in fact, after running it, the storage
contains a message), but script 2 apparently does not (it probably loops because
it does not find any work to do, as you have already told me). How do I modify
the scripts to get what I want? My attempts so far (adding a storage
participant, turning participants into storage participants, ...) have been
miserably unsuccessful. I am clearly misinterpreting some basic concepts. Btw,
does FsStorage support multiple workers (I am on OS X)?

Nicola

marsbomber

unread,
Feb 29, 2012, 6:14:23 AM2/29/12
to openwfe...@googlegroups.com
You can look into RuoteKit. It provides you with a ruote rake task for doing just what you want

Nicola

unread,
Feb 29, 2012, 6:29:17 AM2/29/12
to openwfe...@googlegroups.com
In article <32558475.5.1330514064000.JavaMail.geo-discussion-forums@pbcjk1>,
marsbomber <jimj...@gmail.com> wrote:

> You can look into RuoteKit. It provides you with a ruote rake task for
> doing just what you want

Sorry, I forgot to mention: I have also tried with RuoteKit. The task looks like
this:

task :ruote_kit_worker do
RuoteKit.run_worker(Ruote::FsStorage.new('ruote_test'))
end

At first, this was giving me a “no JSON backend found”, which I have fixed as
John has suggested. Now, it behaves exactly as my custom-made example.

To be clear:
1) I launch a process through RuoteKit's web interface. I see that the process
instance is in the process list.
2) I execute 'rake ruote_kit_worker'
3) The rake task stays running indefinitely, but I do not see absolutely any
change (if I reload the process list or the workitems list in the web interface,
it is the exactly the same as before running the rake task - and also no errors,
no schedules).

I have double-checked that the storage is named correctly. I do not know what I
am doing wrong.

Nicola

marsbomber

unread,
Feb 29, 2012, 6:46:25 AM2/29/12
to openwfe...@googlegroups.com
Not sure if you read one of my blog post linked by John. You might fish out something useful there http://marsbomber.github.com/2012/01/20/ruote-with-rabbitmq-part1/

John Mettraux

unread,
Feb 29, 2012, 7:00:17 AM2/29/12
to ruote
Hello Nicola,

I've repackaged your two scripts in a git repository, with, hopefully, a
valid readme:

https://github.com/jmettraux/for_nicola

It works well.

John Mettraux

unread,
Feb 29, 2012, 7:10:13 AM2/29/12
to openwfe...@googlegroups.com

On Wed, Feb 29, 2012 at 02:52:16AM -0800, marsbomber wrote:
>
> Yes, I see that as the true advantage of taking Ruote into an SOA
> environment. Ruote dictates the workflow (my understanding is that's
> what Ruote is designed for, but John, you'll have a bigger say, coz
> you're the author :p), everything true processing unit just worry
> about their own discrete task.

Yes, ruote orchestrates.

> I actually get a bit confused about your work on ruote-swf. I'm not
> sure where swf fits in the puzzle now ... My guess is if that one
> single job step/expression/participant is doing a hell lot and need to
> break down the sub task into sub-sub tasks, that's when you need swf?

OK, staying off-topic with you.

SWF is a simple workflow framework. Ruote-swf is a "storage" implementation
for ruote.

You do something like:

storage = Ruote::Swf::Storage.new(
SWF_ACCESS_KEY_ID,
SWF_SECRET_ACCESS_KEY,
'swf_domain' => 'xyz')

Ruote::Dashboard.new(
Ruote::Swf::DecisionWorker.new(
Ruote::Swf::ActivityWorker.new(
storage)))

And you then run your usual ruote workflows, but the back-end is SWF.

The Amazon engineers do the maintainance of whatever iron and soft run SWF.
Not you.

It's not as fast as a local Redis ruote storage, it doesn't support a process
listening to the activity of another, but apart from that, it's vanilla
ruote.

I'm waiting from my employer's green light for switching the repository from
private to public.


That's it, kind regards,

Hartog De Mik

unread,
Feb 29, 2012, 7:27:08 AM2/29/12
to openwfe...@googlegroups.com
2012/2/29 John Mettraux <jmet...@gmail.com>:

> Hello Nicola,
>
> I've repackaged your two scripts in a git repository, with, hopefully, a
> valid readme:
>
>  https://github.com/jmettraux/for_nicola
>

gh> This "answer" is released under the MIT license.

:-D

Nicola

unread,
Feb 29, 2012, 2:36:26 PM2/29/12
to openwfe...@googlegroups.com
In article <20120229120...@sanma.local>,
John Mettraux <jmet...@gmail.com> wrote:

> Hello Nicola,
>
> I've repackaged your two scripts in a git repository, with, hopefully, a
> valid readme:
>
> https://github.com/jmettraux/for_nicola
>
> It works well.

Well, I don't know what to say… It works well for me, too :o
Maybe, in my uncountable experiments I have screwed up something with bundler,
or I was using the wrong storage location… Starting from a clean directory has
helped, apparently.

Thanks to all who have spared some time to reply. Your help has been invaluable
and I have learned many things.

Nicola

Nicola

unread,
Feb 29, 2012, 2:45:39 PM2/29/12
to openwfe...@googlegroups.com
In article <26419656.32.1330515985542.JavaMail.geo-discussion-forums@pbje9>,
marsbomber <jimj...@gmail.com> wrote:

> Not sure if you read one of my blog post linked by John. You might fish out
> something useful
> there http://marsbomber.github.com/2012/01/20/ruote-with-rabbitmq-part1/

That's a good tutorial! I've gone through it and all was fine, except that when
I check RabbitMQ's queues I see two queues (email_job and ldap_job) instead of
four (no processes are listed under the ruotekit app as expected).

At a first glance, I am positively impressed by RabbitMQ, so I think the next
step will be to go deeper into that AMQP thing.

Thanks,
Nicola

John Mettraux

unread,
Mar 2, 2012, 4:20:43 PM3/2/12
to openwfe...@googlegroups.com
Hi list,

adding a quick note to this thread.

So we AMQP is popular for buffering work between ruote and participants (thus
AMQP workers).

There are lower-profile alternatives to AMQP:

http://redis.io/topics/data-types#lists
https://github.com/defunkt/resque (builds on redis)
https://github.com/mperham/sidekiq (builds on redis, learns from resque)


Cheers,

Reply all
Reply to author
Forward
0 new messages