[ruote:2223] Problem with ruote 2.1, ruote-amqp and daemon-kit: messages not making it to agent

3 views
Skip to first unread message

threetee

unread,
Apr 30, 2010, 7:48:26 PM4/30/10
to ruote
Hi everyone,

I'm relatively new to Ruby, and I'm very new to BPM/Ruote. I'm
currently considering using Ruote in conjunction with AMQP to manage
multiple distributed systems, so I'm trying to get Rails, ruote, ruote-
kit, ruote-amqp and a daemon-kit-based Ruote agent to all play nicely.
I'm working on getting a working AMQP participant going with a daemon-
kit-based ruote agent, but I'm having problems.

I'm using the following versions of ruote, ruote-amqp, etc.:
ruote (2.1.7)
ruote-amqp (2.1.5)
ruote-kit (2.1.7)
daemon-kit (0.1.8pre) - I'm using daustin's version, patched to
support Ruote 2.1 (http://github.com/daustin/daemon-kit)

Ruote agent starts up fine (pastie of the below is at http://pastie.org/940997):
----------
$ bin/ruote
[daemon-kit]: DaemonKit (0.1.8pre) booting in development mode
[daemon-kit]: Setting up trap for USR1
[daemon-kit]: Setting up trap for USR2
[daemon-kit]: Setting up trap for HUP
[daemon-kit]: Setting up trap for INT
[daemon-kit]: Setting up trap for TERM
[daemon-kit]: DaemonKit (0.1.8pre) booted, now running ruote
[daemon-kit]:
AMQP.start({:pass=>"guest", :host=>"rabbitmq", :user=>"guest", :vhost=>"/"})
[daemon-kit]: Subscribing to work1 for workitems
----------

However, when I try to run the following code (also pastied at
http://pastie.org/940996), the agent sees nothing. I have verified via
tcpdump that traffic is flowing to the rabbitmq server, but I'm not
seeing any queues being created (I do see the work1 queue as created
by the agent, but none of the expected ruote-workitems, etc.).

So my questions are:
1. Should the below code work?
2. How can I get a better view into what is going on behind the scenes
so that I can troubleshoot this myself in the future?

Any help is greatly appreciated, thank you in advance!

----------
#!/usr/bin/env ruby

require 'rubygems'
require 'ruote-amqp'
require 'ruote-amqp/participant'
require 'ruote'
require 'ruote/storage/fs_storage'

engine = Ruote::Engine.new(
Ruote::Worker.new(
Ruote::FsStorage.new('ruote_work')
)
)

AMQP.settings[:host] = 'rabbitmq'
# AMQP.settings[:logging] = true
puts AMQP.settings.inspect

engine.register_participant(:amqp,
RuoteAMQP::Participant.new(:reply_by_default => true )) do |workitem|
puts workitem.inspect
end

pdef = Ruote.process_definition :name => 'print_quote' do
sequence do
amqp :queue => 'work1', :command => '/sample/quote'
end
end

wfid = engine.launch(pdef)

engine.wait_for(wfid)
----------

Output from the above script:
----------
{:vhost=>"/", :pass=>"guest", :ssl=>false, :host=>"rabbitmq", :logging=>false, :timeout=>nil, :user=>"guest", :port=>5672}
#<Ruote::Workitem:0x101de46b0 @h={"participant_name"=>"amqp",
"fields"=>{"params"=>{"command"=>"/sample/quote", "queue"=>"work1",
"ref"=>"amqp"}}, "fei"=>{"sub_wfid"=>nil, "wfid"=>"20100430-
bidoyokudu", "engine_id"=>"engine", "expid"=>"0_0_0"}}>
----------

Again, thank you for any help!

--
you received this message because you are subscribed to the "ruote users" group.
to post : send email to openwfe...@googlegroups.com
to unsubscribe : send email to openwferu-use...@googlegroups.com
more options : http://groups.google.com/group/openwferu-users?hl=en

John Mettraux

unread,
Apr 30, 2010, 9:52:57 PM4/30/10
to openwfe...@googlegroups.com
Hello ThreeTee,

at first a quick disclaimer, I'm ruote's main author, but I haven't much experience with ruote-amqp.

RuoteAMQP::Participant, as of ruote-amqp 2.1.5, isn't supposed to accept blocks, it's fun that it works and displays the workitem.

If I understood correctly your ruby example isn't emitting anything to the AMQP queue, is that right ?

How does

---8<---
engine.register_participant(:amqp, RuoteAMQP::Participant, :reply_by_default => true)
--->8---

work though ?

Ruote 2.1 prefers participants registered this way.


Cheers,

--
John Mettraux - http://jmettraux.wordpress.com

threetee

unread,
Apr 30, 2010, 10:24:32 PM4/30/10
to ruote
Excellent, that did the trick! Now I just have to figure out how to
get ruote-amqp to grab the item from the ruote_workitems queue; it's
just sitting there at the moment:

-----
$ amqp-peek ruote_workitems
(ruote_workitems)
Header:
:message_count => 1
:delivery_mode => 1
:delivery_tag => 1
:routing_key => "ruote_workitems"
:redelivered => false
:content_type => "application/octet-stream"
:exchange => ""
:priority => 0
Message: "{\"participant_name\":\"amqp\",\"fields\":{\"success
\":true,\"dispatched_at\":\"2010-05-01 02:01:59.969869 UTC\",\"quote\":
\"Ecclesiastes 10:8-9\\n8. Whoever digs a pit may fall into it;
whoever breaks through a wall may be \\nbitten by a snake.\\n9.
Whoever quarries stones may be injured by them; whoever splits logs
may be \\nendangered by them.\\n\\n[bible]
http://iheartquotes.com/fortune/show/11913\\n\",\"params\":{\"command\":\"/sample/quote\",\"ref\":\"amqp\",\"queue\":\"work1\",\"reply_queue\":\"ruote_workitems\"}},\"fei\":{\"sub_wfid\":null,\"wfid\":\"20100501-bopehiyade\",\"engine_id\":\"engine\",\"expid\":\"0_0_0\"}}"
-----

I'll plat with it some more, I suspect it's just my ignorance of how
ruote-amqp is intended to be used. I'll post again if I figure it out.

Thanks again for the quick response!

-----
$ bin/ruote
[daemon-kit]: DaemonKit (0.1.8pre) booting in development mode
[daemon-kit]: Setting up trap for USR1
[daemon-kit]: Setting up trap for USR2
[daemon-kit]: Setting up trap for HUP
[daemon-kit]: Setting up trap for INT
[daemon-kit]: Setting up trap for TERM
[daemon-kit]: DaemonKit (0.1.8pre) booted, now running ruote
[daemon-kit]:
AMQP.start({:pass=>"guest", :host=>"localhost", :user=>"guest", :vhost=>"/"})
[daemon-kit]: Subscribing to work1 for workitems
[daemon-kit]: Received workitem: "{\"participant_name\":\"amqp\",
\"fields\":{\"dispatched_at\":\"2010-05-01 02:01:59.969869 UTC\",
\"params\":{\"command\":\"/sample/quote\",\"reply_queue\":
\"ruote_workitems\",\"queue\":\"work1\",\"ref\":\"amqp\"}},\"fei\":
{\"sub_wfid\":null,\"wfid\":\"20100501-bopehiyade\",\"engine_id\":
\"engine\",\"expid\":\"0_0_0\"}}"
[daemon-kit]: Replying to engine via AMQP with
#<DaemonKit::RuoteWorkitem:0x101505a30
@workitem={"participant_name"=>"amqp", "fields"=>{"success"=>true,
"dispatched_at"=>"2010-05-01 02:01:59.969869 UTC",
"quote"=>"Ecclesiastes 10:8-9\n8. Whoever digs a pit may fall into it;
whoever breaks through a wall may be \nbitten by a snake.\n9. Whoever
quarries stones may be injured by them; whoever splits logs may be
\nendangered by them.\n\n[bible] http://iheartquotes.com/fortune/show/11913\n",
"params"=>{"command"=>"/sample/quote", "ref"=>"amqp",
"queue"=>"work1", "reply_queue"=>"ruote_workitems"}},
"fei"=>{"sub_wfid"=>nil, "wfid"=>"20100501-bopehiyade",
"engine_id"=>"engine", "expid"=>"0_0_0"}}>
[daemon-kit]: Processed workitem.
-----
> John Mettraux -http://jmettraux.wordpress.com

threetee

unread,
May 2, 2010, 12:32:28 PM5/2/10
to ruote
I've made some progress, but have run into another snag: I've added a
WorkitemListener to the engine, and it is consuming messages from the
ruote_workitems queue. However, it seems like it is not passing them
correctly back into Ruote.

Latest version of my test script (also pastied at http://pastie.org/942518):
----------
#!/usr/bin/env ruby

require 'rubygems'
require 'ruote-amqp'
require 'ruote-amqp/participant'
require 'ruote'
require 'ruote/storage/fs_storage'

AMQP.settings[:logging] = false
puts AMQP.settings.inspect

engine = Ruote::Engine.new(
Ruote::Worker.new(
Ruote::FsStorage.new('ruote_work')
)
)

listener = RuoteAMQP::WorkitemListener.new(engine)

pdef = Ruote.process_definition :name => 'print_quote' do
sequence do
amqp :queue => 'work1', :command => '/sample/quote'
end
end

engine.register_participant(:amqp, RuoteAMQP::Participant)

fei = engine.launch(pdef)

engine.wait_for(fei)
----------

Script output (also pastied at http://pastie.org/942522)
----------
$ ruby ruote-amqp_test.rb
{:timeout=>nil, :pass=>"guest", :user=>"guest", :vhost=>"/", :ssl=>false, :host=>"127.0.0.1", :logging=>false, :port=>5672}
/Library/Ruby/Gems/1.8/gems/ruote-2.1.10/lib/ruote/receiver/base.rb:
47:in `reply': undefined method `storage' for nil:NilClass
(NoMethodError)
from /Library/Ruby/Gems/1.8/gems/ruote-amqp-2.1.5/lib/ruote-amqp/
workitem_listener.rb:67:in `initialize'
from /Library/Ruby/Gems/1.8/gems/amqp-0.6.7/lib/mq/queue.rb:391:in
`call'
from /Library/Ruby/Gems/1.8/gems/amqp-0.6.7/lib/mq/queue.rb:391:in
`receive'
from /Library/Ruby/Gems/1.8/gems/amqp-0.6.7/lib/mq.rb:171:in
`process_frame'
from /Library/Ruby/Gems/1.8/gems/amqp-0.6.7/lib/amqp/client.rb:9:in
`process_frame'
from /Library/Ruby/Gems/1.8/gems/amqp-0.6.7/lib/amqp/client.rb:117:in
`receive_data'
from /Library/Ruby/Gems/1.8/gems/eventmachine-0.12.10/lib/
eventmachine.rb:256:in `run_machine'
from /Library/Ruby/Gems/1.8/gems/eventmachine-0.12.10/lib/
eventmachine.rb:256:in `run'
from /Library/Ruby/Gems/1.8/gems/amqp-0.6.7/lib/amqp.rb:79:in `start'
from /Library/Ruby/Gems/1.8/gems/ruote-amqp-2.1.5/lib/ruote-amqp.rb:
43:in `start!'
from /Library/Ruby/Gems/1.8/gems/ruote-amqp-2.1.5/lib/ruote-amqp.rb:
41:in `initialize'
from /Library/Ruby/Gems/1.8/gems/ruote-amqp-2.1.5/lib/ruote-amqp.rb:
41:in `new'
from /Library/Ruby/Gems/1.8/gems/ruote-amqp-2.1.5/lib/ruote-amqp.rb:
41:in `start!'
from /Library/Ruby/Gems/1.8/gems/ruote-amqp-2.1.5/lib/ruote-amqp/
workitem_listener.rb:60:in `initialize'
from ruote-amqp_test.rb:26:in `new'
from ruote-amqp_test.rb:26

daemon-kit agent output:
$ bin/ruote
[daemon-kit]: DaemonKit (0.1.8pre) booting in development mode
[daemon-kit]: Setting up trap for USR1
[daemon-kit]: Setting up trap for USR2
[daemon-kit]: Setting up trap for HUP
[daemon-kit]: Setting up trap for INT
[daemon-kit]: Setting up trap for TERM
[daemon-kit]: DaemonKit (0.1.8pre) booted, now running ruote
[daemon-kit]:
AMQP.start({:pass=>"guest", :host=>"localhost", :user=>"guest", :vhost=>"/"})
[daemon-kit]: Subscribing to work1 for workitems
[daemon-kit]: Received workitem: "{\"participant_name\":\"amqp\",
\"fields\":{\"dispatched_at\":\"2010-05-02 16:22:35.893107 UTC\",
\"params\":{\"command\":\"/sample/quote\",\"reply_queue\":
\"ruote_workitems\",\"queue\":\"work1\",\"ref\":\"amqp\"}},\"fei\":
{\"sub_wfid\":null,\"wfid\":\"20100502-bigapetami\",\"engine_id\":
\"engine\",\"expid\":\"0_0_0\"}}"
[daemon-kit]: Replying to engine via AMQP with
#<DaemonKit::RuoteWorkitem:0x1015047c0
@workitem={"participant_name"=>"amqp", "fields"=>{"success"=>true,
"dispatched_at"=>"2010-05-02 16:22:35.893107 UTC", "quote"=>"Garbage
In -- Gospel Out.\n\n[fortune] http://iheartquotes.com/fortune/show/44430\n",
"params"=>{"command"=>"/sample/quote", "ref"=>"amqp",
"queue"=>"work1", "reply_queue"=>"ruote_workitems"}},
"fei"=>{"sub_wfid"=>nil, "wfid"=>"20100502-bigapetami",
"engine_id"=>"engine", "expid"=>"0_0_0"}}>
[daemon-kit]: Processed workitem.
----------

Versions:
----------
ruote (2.1.10)
ruote-amqp (2.1.5)
ruote-kit (2.1.7)
ruby 1.8.7 (2009-06-08 patchlevel 173) [universal-darwin10.0]
----------

John Mettraux

unread,
May 2, 2010, 10:48:14 PM5/2/10
to openwfe...@googlegroups.com

On Sun, May 02, 2010 at 09:32:28AM -0700, threetee wrote:
>
> $ ruby ruote-amqp_test.rb
> {:timeout=>nil, :pass=>"guest", :user=>"guest", :vhost=>"/", :ssl=>false, :host=>"127.0.0.1", :logging=>false, :port=>5672}
> /Library/Ruby/Gems/1.8/gems/ruote-2.1.10/lib/ruote/receiver/base.rb:
> 47:in `reply': undefined method `storage' for nil:NilClass
> (NoMethodError)

Mea culpa.

http://github.com/jmettraux/ruote/blob/ruote2.1/TODO.txt#L352

Let me fix that quickly, stay tuned,

--
John Mettraux - http://jmettraux.wordpress.com

John Mettraux

unread,
May 2, 2010, 11:40:51 PM5/2/10
to openwfe...@googlegroups.com

On Mon, May 3, 2010 at 11:48 AM, John Mettraux <jmet...@openwfe.org> wrote:
>
> On Sun, May 02, 2010 at 09:32:28AM -0700, threetee wrote:
>>
>> $ ruby ruote-amqp_test.rb
>> {:timeout=>nil, :pass=>"guest", :user=>"guest", :vhost=>"/", :ssl=>false, :host=>"127.0.0.1", :logging=>false, :port=>5672}
>> /Library/Ruby/Gems/1.8/gems/ruote-2.1.10/lib/ruote/receiver/base.rb:
>> 47:in `reply': undefined method `storage' for nil:NilClass
>> (NoMethodError)
>
> Mea culpa.
>
>  http://github.com/jmettraux/ruote/blob/ruote2.1/TODO.txt#L352
>
> Let me fix that quickly, stay tuned,

Hello,

http://github.com/jmettraux/ruote/commit/3119291792e33df0acfcd4d3b3f470b7a01e0288

should hopefully fix the issue. Please tell me if you run into other issues.


Best regards,

threetee

unread,
May 3, 2010, 11:22:38 AM5/3/10
to ruote
Thanks for the reply. Unfortunately, this behavior is still occurring:

----------
$ ruby ruote-amqp_test.rb
{:timeout=>nil, :pass=>"guest", :user=>"guest", :vhost=>"/", :ssl=>false, :host=>"127.0.0.1", :logging=>false, :port=>5672}
/Library/Ruby/Gems/1.8/gems/ruote-2.1.10/lib/ruote/receiver/base.rb:
47:in `reply': undefined method `storage' for nil:NilClass
(NoMethodError)
from ruote-amqp_test.rb:20:in `new'
from ruote-amqp_test.rb:20
----------

Please let me know if there is any additional information I can
provide which might be helpful.

On May 2, 8:40 pm, John Mettraux <jmettr...@openwfe.org> wrote:
> On Mon, May 3, 2010 at 11:48 AM, John Mettraux <jmettr...@openwfe.org> wrote:
>
> > On Sun, May 02, 2010 at 09:32:28AM -0700, threetee wrote:
>
> >> $ ruby ruote-amqp_test.rb
> >> {:timeout=>nil, :pass=>"guest", :user=>"guest", :vhost=>"/", :ssl=>false, :host=>"127.0.0.1", :logging=>false, :port=>5672}
> >> /Library/Ruby/Gems/1.8/gems/ruote-2.1.10/lib/ruote/receiver/base.rb:
> >> 47:in `reply': undefined method `storage' for nil:NilClass
> >> (NoMethodError)
>
> > Mea culpa.
>
> >  http://github.com/jmettraux/ruote/blob/ruote2.1/TODO.txt#L352
>
> > Let me fix that quickly, stay tuned,
>
> Hello,
>
>  http://github.com/jmettraux/ruote/commit/3119291792e33df0acfcd4d3b3f4...

John Mettraux

unread,
May 3, 2010, 9:44:34 PM5/3/10
to openwfe...@googlegroups.com

On Mon, May 03, 2010 at 08:22:38AM -0700, threetee wrote:
> Thanks for the reply. Unfortunately, this behavior is still occurring:
>
> ----------
> $ ruby ruote-amqp_test.rb
> {:timeout=>nil, :pass=>"guest", :user=>"guest", :vhost=>"/", :ssl=>false, :host=>"127.0.0.1", :logging=>false, :port=>5672}
> /Library/Ruby/Gems/1.8/gems/ruote-2.1.10/lib/ruote/receiver/base.rb:
> 47:in `reply': undefined method `storage' for nil:NilClass
> (NoMethodError)

Hello,

The initializer for ruote-amqp's listener looks like

---8<---
def initialize( engine_or_storage, queue = nil )

@storage = engine_or_storage.respond_to?( :storage ) ? engine_or_storage.storage : engine_or_storage

self.class.queue = queue if queue

RuoteAMQP.start!

MQ.queue( self.class.queue, :durable => true ).subscribe do |message|
if AMQP.closing?
# Do nothing, we're going down
else
workitem = decode_workitem( message )
reply( workitem )
end
end
end
--->8---

should be replaced with

---8<---
def initialize( engine_or_storage, queue = nil )

super( engine_or_storage )

self.class.queue = queue if queue

RuoteAMQP.start!

MQ.queue( self.class.queue, :durable => true ).subscribe do |message|
if AMQP.closing?
# Do nothing, we're going down
else
workitem = decode_workitem( message )
reply( workitem )
end
end
end
--->8---

That makes a patch for Kenneth.

Tell me how it goes.


My best regards,

Kenneth Kalmer

unread,
May 4, 2010, 1:46:14 PM5/4/10
to openwfe...@googlegroups.com
Accepted and pushed. Working on the daemon-kit side with daustin's fork and then I'll bump versions on both ends.

Thanks a lot !

Best 


--
Kenneth Kalmer
kenneth...@gmail.com
http://opensourcery.co.za
@kennethkalmer

Three Tee

unread,
May 4, 2010, 2:10:51 PM5/4/10
to openwfe...@googlegroups.com
Excellent! By the way, I can confirm that this patch fixed the issue for me, and ruote/ruote-amqp is now talking to my daemon-kit ruote agent. :)

- E
Reply all
Reply to author
Forward
0 new messages