A lab needs to run automatic and computationally expensive data analyses (which
can last days or even weeks); each such task is the execution of one or more
command-line tools, currently launched manually from a shell. Some tasks must
run sequentially and some other may run concurrently. When a task is over,
typically some manual inspection of the output must be carried out to decide
whether the workflow can proceed to the next (time-expensive) analysis.
Sometimes (more often than not), tasks are not completed because an error occurs
(for example, the task hits a memory limit and it is killed by the operating
system), so a human agent must decide what to do (relaunch, cancel, start a
different workflow, etc...). The reason I'd like to use Ruote for this instead
of some job scheduler is that there is a non-linear mix of computer and human
tasks to be performed, and a job scheduler is not flexible enough (in
particular, it does not handle the human part).
The main problem I am facing is: what is the best way to execute the computer
tasks in separate (Ruby) processes, and start/stop them and track their status,
say, from a web interface à la ruote-kit? As far as I can see, Ruote can spawn a
new thread when it hands a workitem to a participant, but not a new (Ruby)
process. Who should be responsible for spawning a separate Ruby process? Should
it be done in the participant? Or in the “main” program? Or should I use some
client-server architecture?
I have a feeling that workers play a role here, but how they... work (ehm) is
not that clear to me yet. I have read both the Ruote-Kit Readme and the blog
post about Ruote 2.1, but what code like this does
storage = Ruote::FsStorage.new('ruote_work')
worker = Ruote::Worker.new(storage)
worker.run
# current thread is now running worker
and whether it is self-contained is still a mystery to me (how does this know
what to look for in the storage? What does it “run”?). When I run it, a script
like the above either gets stuck or it gives an error like “no JSON backend
found” (in the case of ruote-kit). I would be glad if someone could help me make
my mind clear on these issues.
And if I am allowed to abuse your patience a bit more, I have also a couple of
specific, and probably naive, questions:
1) if I define a participant by subclassing Ruote::StorageParticipant, do I
still need to mixin Ruote::LocalParticipant?
(2) In many examples of process definitions, participants are passed a :task
parameter. Which makes me wonder: does that have a special meaning in Ruote? I
have never seen an example of a participant implementation making use of the
:task parameter, so I have always assumed that it is a name like another and
have used it as follows:
class MyParticipant
include Ruote::LocalParticipant
def consume(workitem)
case workitem.params[:task]
when 'do this' then dothis(workitem)
when 'do that' then dothat(workitem)
end
end
private
def dothis(wi)
[...]
end
def dothat(wi)
[...]
end
end
Is this the intended usage pattern?
Nicola
Hello Nicola,
Not everybody needs ruote to fork a new Ruby process for a participant
execution, most of the use cases don't require that.
Somehow, since ruote has a worker architecture, as soon as you have multiple
workers, you are actually executing task in separate Ruby processes.
If I read correctly, you want to run commands via ruote. A very naive
implementation:
---8<---
class SpawnerParticipant
include Ruote::LocalParticipant
def on_workitem
workitem.fields['result'] = `#{workitem.fields['command']}`
reply
end
def on_cancel
# well, can't do much... :-(
end
end
--->8---
A better implementation would probably use something like
https://github.com/ahoward/open4
or
https://github.com/kschiess/procrastinate
store the PIDs somehow (and the mapping between a workitem id and a PID) for
cancel attempts...
You could store the identifier in the command itself so that the cancel code
can skim the "ps" output to retrieve the PID to signal to...
> I have a feeling that workers play a role here, but how they... work (ehm) is
> not that clear to me yet. I have read both the Ruote-Kit Readme and the blog
> post about Ruote 2.1, but what code like this does
>
> storage = Ruote::FsStorage.new('ruote_work')
> worker = Ruote::Worker.new(storage)
> worker.run
> # current thread is now running worker
>
> and whether it is self-contained is still a mystery to me (how does this know
> what to look for in the storage? What does it “run”?). When I run it, a script
> like the above either gets stuck or it gives an error like “no JSON backend
> found” (in the case of ruote-kit). I would be glad if someone could help me make
> my mind clear on these issues.
A detailed issue report would help us help you.
When it gets "stuck", I guess it simply loops for work and since there is
nothing to do, it appears to be stuck.
Doing something like "require 'rufus-json/automatic'" can alleviate the "no
JSON backend found".
.
Generally the code above comes as
# self-contained dashboard + worker + storage
dashboard = Ruote::Dashboard.new(
Ruote::Worker.new(
Ruote::FsStorage.new('ruote_work')))
Useful for testing.
# dashboard only (no ruote process execution takes place here, front-end)
dashboard = Ruote::Dashboard.new(
Ruote::FsStorage.new('ruote_work'))
# worker only (no control, pure execution, back-end)
worker = Ruote::Worker.new(
Ruote::FsStorage.new('ruote_work'))
worker.run
The trick is that, when a Dashboard is passed a worker at initialization, it
will call its #run method, so the last bit can be rewritten as:
dashboard = Ruote::Dashboard.new(
Ruote::Worker.new(
Ruote::FsStorage.new('ruote_work')))
and be equivalent (well, there is visibly one object more, an instance of
Dashboard, but it's not very heavy).
Granted, it may look byzantine, but I've settled on this "encapsulation"
mecha to embody composition of dashboard / worker / storage and variants.
It's byzantine enough that people stop and start thinking about the
implications.
.
Now about the "running" thing. When a worker runs, it polls the storage for
messages that are pieces of ruote process execution. The msg generally
involves fetching a flow expression or instantiating a new one and do
something with it.
When the message is handled, it is discarded, but probably
a new, resulting, message has been pushed to the storage (that new message
will get consumed by the first worker who'll put its hand on it).
Note that in the case of a concurrence application, one message per
concurrent branch is issued.
.
Here is what the #run method looks like:
> And if I am allowed to abuse your patience a bit more, I have also a couple of
> specific, and probably naive, questions:
>
> 1) if I define a participant by subclassing Ruote::StorageParticipant, do I
> still need to mixin Ruote::LocalParticipant?
No,
---8<---
module M
def x
p :x
end
end
class A
include M
def y
p :y
end
end
class B < A
end
b = B.new
b.x # => :x
b.y # => :y
--->8---
> (2) In many examples of process definitions, participants are passed a :task
> parameter. Which makes me wonder: does that have a special meaning in Ruote? I
> have never seen an example of a participant implementation making use of the
> :task parameter, so I have always assumed that it is a name like another and
> have used it as follows:
>
> class MyParticipant
> include Ruote::LocalParticipant
>
> def consume(workitem)
> case workitem.params[:task]
> when 'do this' then dothis(workitem)
> when 'do that' then dothat(workitem)
> end
> end
>
> private
> def dothis(wi)
> [...]
> end
>
> def dothat(wi)
> [...]
> end
> end
>
> Is this the intended usage pattern?
It's a possible usage pattern.
Most of the time, the :task attribute is used as a "title" for the workitem
handed to a human participant. So it's more of a convention.
You are totally free to follow the pattern you described.
Questions are welcome. Best regards,
--
John Mettraux - http://lambda.io/processi
Hello Mars,
can you tell us for which domain you developed your solution?
> As something for myself, John, does what I describe make sense to you
> at all? Am I over-complicating things?
> Or there are aspects from Ruote, which I simply missed, that could
> simplify things a little?
This "RabbitMQ between ruote and the hard-working participants" pattern seems
to be used by lots of people.
I sometimes wonder if a simpler queue, like a Redis queue wouldn't be
sufficient, but I guess people are used to deploy and manage RabbitMQ and
they like to have fancy topologies so it makes a lot of sense.
I also wonder if a 0mq setting might not be nice. We could also use 0mq
inside of a ruote storage (might be very fun).
I have to say I'm impressed, we first talked 39 days ago (according to my
Twitter client) and since, you built such a nice system, barely asking any
questions.
BTW, we should discuss if your deep-merge is a good candidate for the master
branch.
Keep up the great work!
This "RabbitMQ between ruote and the hard-working participants" pattern seemsto be used by lots of people.
Hello Farrel,
it should be OK, that's how I'd do it too.
If we look a bit further, placing a queue between ruote and some of the
participants has an interesting advantage: not only ruote can queue work for
those non-ruote workers. Ruote becomes just a[n orchestration] client among
other clients that can place work orders.
Of course if you don't need that level of refinement, then don't go for it
(but remember that you can switch participant later when you have accumulated
enough experience and metrics).
Taking some time to look at RubyAMQP and co is worth it:
Some of those patterns can be implemented with other queues than AMQP ones.
I hope others will chime in, cheers,
# Script 1
engine = Ruote::Engine.new(Ruote::FsStorage.new('ruote_test'))
engine.register_participant '.+' do |workitem|
puts workitem.params['msg']
end
pdef = Ruote.process_definition do
sequence do
participant :alpha, :msg => 'Running alpha'
participant :beta, :msg => 'Running beta'
end
end
wfid = engine.launch(pdef)
-----------------
# Script 2
storage = Ruote::FsStorage.new('ruote_test')
worker = Ruote::Worker.new(storage)
worker.run
Ideally, I would like script 1 to launch a process instance and exit, and script
2 to “pick up” that instance from the storage and execute it (by running the two
scripts one after another and on the same machine). Essentially, I would like
script 2 to output:
Running alpha
Running beta
Script 1 seems to serve the purpose (in fact, after running it, the storage
contains a message), but script 2 apparently does not (it probably loops because
it does not find any work to do, as you have already told me). How do I modify
the scripts to get what I want? My attempts so far (adding a storage
participant, turning participants into storage participants, ...) have been
miserably unsuccessful. I am clearly misinterpreting some basic concepts. Btw,
does FsStorage support multiple workers (I am on OS X)?
Nicola
> You can look into RuoteKit. It provides you with a ruote rake task for
> doing just what you want
Sorry, I forgot to mention: I have also tried with RuoteKit. The task looks like
this:
task :ruote_kit_worker do
RuoteKit.run_worker(Ruote::FsStorage.new('ruote_test'))
end
At first, this was giving me a “no JSON backend found”, which I have fixed as
John has suggested. Now, it behaves exactly as my custom-made example.
To be clear:
1) I launch a process through RuoteKit's web interface. I see that the process
instance is in the process list.
2) I execute 'rake ruote_kit_worker'
3) The rake task stays running indefinitely, but I do not see absolutely any
change (if I reload the process list or the workitems list in the web interface,
it is the exactly the same as before running the rake task - and also no errors,
no schedules).
I have double-checked that the storage is named correctly. I do not know what I
am doing wrong.
Nicola
I've repackaged your two scripts in a git repository, with, hopefully, a
valid readme:
https://github.com/jmettraux/for_nicola
It works well.
Yes, ruote orchestrates.
> I actually get a bit confused about your work on ruote-swf. I'm not
> sure where swf fits in the puzzle now ... My guess is if that one
> single job step/expression/participant is doing a hell lot and need to
> break down the sub task into sub-sub tasks, that's when you need swf?
OK, staying off-topic with you.
SWF is a simple workflow framework. Ruote-swf is a "storage" implementation
for ruote.
You do something like:
storage = Ruote::Swf::Storage.new(
SWF_ACCESS_KEY_ID,
SWF_SECRET_ACCESS_KEY,
'swf_domain' => 'xyz')
Ruote::Dashboard.new(
Ruote::Swf::DecisionWorker.new(
Ruote::Swf::ActivityWorker.new(
storage)))
And you then run your usual ruote workflows, but the back-end is SWF.
The Amazon engineers do the maintainance of whatever iron and soft run SWF.
Not you.
It's not as fast as a local Redis ruote storage, it doesn't support a process
listening to the activity of another, but apart from that, it's vanilla
ruote.
I'm waiting from my employer's green light for switching the repository from
private to public.
That's it, kind regards,
gh> This "answer" is released under the MIT license.
:-D
> Hello Nicola,
>
> I've repackaged your two scripts in a git repository, with, hopefully, a
> valid readme:
>
> https://github.com/jmettraux/for_nicola
>
> It works well.
Well, I don't know what to say… It works well for me, too :o
Maybe, in my uncountable experiments I have screwed up something with bundler,
or I was using the wrong storage location… Starting from a clean directory has
helped, apparently.
Thanks to all who have spared some time to reply. Your help has been invaluable
and I have learned many things.
Nicola
> Not sure if you read one of my blog post linked by John. You might fish out
> something useful
> there http://marsbomber.github.com/2012/01/20/ruote-with-rabbitmq-part1/
That's a good tutorial! I've gone through it and all was fine, except that when
I check RabbitMQ's queues I see two queues (email_job and ldap_job) instead of
four (no processes are listed under the ruotekit app as expected).
At a first glance, I am positively impressed by RabbitMQ, so I think the next
step will be to go deeper into that AMQP thing.
Thanks,
Nicola
adding a quick note to this thread.
So we AMQP is popular for buffering work between ruote and participants (thus
AMQP workers).
There are lower-profile alternatives to AMQP:
http://redis.io/topics/data-types#lists
https://github.com/defunkt/resque (builds on redis)
https://github.com/mperham/sidekiq (builds on redis, learns from resque)
Cheers,