Building a rabbitmq style pubsub gem using concurrent-ruby actors, few questions

59 views
Skip to first unread message

Jason Ayre

unread,
Jul 9, 2015, 9:33:28 PM7/9/15
to concurr...@googlegroups.com
So I just pushed an initial (working) prototype version of a rabbitmq like pubsub gem (integrating into active record mainly at the moment), I had some questions, and I was hoping to get feedback on my library, particularly from whoever has done most of the actor code. (well, thats actually probably only 50% of it, bunch of questions about the lazy registry as well.)

https://github.com/jasonayre/action_pubsub
(ignore the subscriber at the root, the active_record subscriber / namespace is where the working code is)

So the goal was initially, replace rails observer library in current application, with an async publish/subscribe in same thread program, solve all of the shortcomings of rails observers (singleton mutatable state for one), while running it in the main application thread as to not introduce another dependency. I seem to have all that working, and moved on to a bigger problem, supporting concurrency with multiple subscribers, e.x.

When a new blog comment is published, I want to do something that will hit the database hard, lets say send emails to everyone who has commented on that post. So I create a subscriber, comment is created on front end, it no longer blocks. Great:
Now I want my subscriber to be concurrent, running copies of that subscriber at once, so that when a new comment is created Im able to cut through the work with multiple workers. -- The tricky part of this of course is making sure that each subscriber doesn't get a duplicate copy of the message. Thats where the rabbitmq broker makes things easy, and delivers one copy of the message into each subscriber groups box. (well that's just one strategy, it of course supports a ton of different routing patterns)

So, I managed to implement the above as well, simply by subclassing  ::Concurrent::Actor::Utils::Balancer. It almost blew my mind when I upped a subscribers concurrency to 10 and saw it handling individual message delivery properly. Part of my reason for walking through all of this is because it still feels like it was too easy, so I want to make sure I'm not missing something :)

Next the problem was, ok so I have this one CommentSubscriber, but now I want to say, subscribe to comments from a different module in my application, lets say I want to run a sentiment analysis on the comment text, via some 3rd party api. So now if I subscribe to that balancer it wont work, because it will see not distinction between the subscribers to that thing, and only broker out one copy of the message, regardless of whether its a "Blog::CommentSubscriber", or a "Analysis::CommentSubscriber", when what I need is for one instance of both of those subscribers to get a copy of the message.

The way I solved the above was by having a "Exchange" class, which descends from LazyRegister, which has a bunch of keys like /blog/comment/created /blog/comment/destroyed, /analysis/comment/created, then for each subscription path, the value is a descendent of Concurrent::Actor::Utils::Balancer. (So the full path to a particular subscriber queue ends up structured like /#publisher_namespace/#subscriber_name.)

Then when I broadcast the event (i.e. /blog/comment/created), I loop through all of the individual queues in the exchange and then do the actual delivering of one copy of the event to each queue.

That brings me to my questions.

1) Do you see a better way to do this or anything built in that I'm missing out on?
2) Any potential pitfalls of this structure that maybe I haven't thought through? Particularly in regards to the message passing?
I guess part of the reason Im asking is https://github.com/jasonayre/action_pubsub/blob/master/lib/action_pubsub.rb#L48
(it kind of seems like I wasn't supposed to iterate the keys and values since I have to instance_variable_get them).
3 ) Regarding the lazy registry again, in my application Im implementing this into, specs (rspec) are currently failing for the subscribers. The lazy registry never seems to populate, before I spend a ton of time debugging, figured I'd check to see if anyone knows why that would be happening in case its something obvious.

Main question:
4) Handling task failure
For context: https://github.com/jasonayre/action_pubsub/blob/master/lib/action_pubsub/active_record/subscriber.rb

So in case the subscriber code is confusing, basically all that Im doing is creating the subscription separately when it's inherited, and I want that subscription to initialize a new copy of the subscriber every time an event propogates, to keep things less mutable, and allow for memoizing instance variables and what not in the subscriber class. So the subscription, is actually doing the subscribing, and the subscriber is instantiated to handle the work.

This sets up the initial subscription - https://github.com/jasonayre/action_pubsub/blob/master/lib/action_pubsub/active_record/subscriber.rb#L47
Then here, you can see it rebinding after the event is processed - https://github.com/jasonayre/action_pubsub/blob/master/lib/action_pubsub/active_record/subscriber.rb#L47

(according to the docs I have to resubscribe). -- anyways i actually have 2 questions.

A) Haven't tested this yet: If all subscribers are busy processing, will the queue properly hold on and still allow new messages to be passed into its envelope, while the subscribers are working, and then once one finishes and resubscribes, it will pop another message off the queue? (I think the docs implied this would happen, but in other places it says if no ones listening to the queue it wont push more messages to it, as to not run it out of memory) - This is similar to rabbit and its message acknowledgement (if its not turned on then the queue will discards because no ones listening), however in this case, the subscriber may temporarily be working and not listening. So in that case, will the message be discarded, or is the balancer smart enough to know that the listeners are just working, and will be listening again soon? (this is obviously pretty important to my use case :) )

B) Is there a built in way (in the following class which is a descendant of AdHoc), that I can tap into the success or failure of the subscription, to say allow for a user to define failure handling, (for instance to store an event in sidekiq and re run it at a later time). That would be dope if so.

https://github.com/jasonayre/action_pubsub/blob/master/lib/action_pubsub/active_record/subscription.rb#L6

Oh also, should I be rebinding the subscription there or is there a better way to do that? Like if an error is thrown, does that mean my actor is going to be permanently unsubscribed? Also, is my understanding correct that all the return results (unless specified otherwise), are being wrapped in (future or promise) failures if an error is thrown? So the error just triggers the failed state on the future or promise?

Anyways, sorry for the length. Just wanted to make (hopefully) my questions clear. Super super awesome library so far, thanks for all your hard work on it contributors.

Petr Chalupa

unread,
Jul 14, 2015, 3:42:21 AM7/14/15
to Jason Ayre, concurr...@googlegroups.com
Hello Jason,

I am glad to hear that you are using our gem and you like it. I am excited to look at what you are building, I'll find some time to give you answers (where I can), but It may take few (±5) days.

Cheers
Petr Chalupa

--
You received this message because you are subscribed to the Google Groups "Concurrent Ruby" group.
To unsubscribe from this group and stop receiving emails from it, send an email to concurrent-ru...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Petr Chalupa

unread,
Jul 19, 2015, 2:24:05 PM7/19/15
to Jason Ayre, concurr...@googlegroups.com
Hi Jason,

let me try to answer your questions: 

1) There is also Broadcast actor, it could be combined into tree, having Broadcast at the top with two subscribed Balancers for "Blog::CommentSubscriber" and "Analysis::CommentSubscriber". Not sure if I got that right.

2) The lazy-register was intended for mostly immutable data like services. The instance_variable_get is also quite bad sign that it's not a good fit.

3) It seems like a timing issue, some code is asking for the data before it's populated. Very wild guess. 

4 A) Balancer never drops messages, if there is no subscriber it's keeps buffering the messages. It does not have any build-in throttling.
4 B) The AdHoc is intended only for experimenting, creating normal actor might be better. You can add the hooks for success/failure into this new actor.

Yes, if you `actor.ask(message)` it returns Future which can be queried for success/failure (value/reason).

I hope it'll help at least a little. Please do not hesitate to ask more questions either here or on https://gitter.im/ruby-concurrency/concurrent-ruby.

Best Regards,
Petr


Jason Ayre

unread,
Aug 5, 2015, 11:06:43 AM8/5/15
to Concurrent Ruby, jaso...@gmail.com
Awesome, thanks Petr!
Reply all
Reply to author
Forward
0 new messages