Design questions

43 views
Skip to first unread message

Ben Lovell

unread,
Mar 5, 2013, 2:54:54 PM3/5/13
to cellulo...@googlegroups.com
Hey,

I have a bunch of pipelines (think pipes & filters) that take an individual message from an AMQP queue, transform it throughout several steps (filters) and eventually publish to a different queue. Most of the steps are  computational but some are responsible for getting further data (from services, databases other I/O) to mutate/supplement the message.

I'm trying to determine how best to leverage celluloid and allow execution of the filters in parallel. I hacked together something with notifications whereby the pipeline was responsible for wiring up its filters to subscribe to events published in the filter prior once its work was complete. This worked but didn't feel particularly smart and could get quite complex as the number of filters increases. It felt as though I was abusing the celluloid notifiers mechanics somehow.

What is the best approach to orchestrating pipelines as groups of filters and queuing their input and output messages? My guess is that a filter is an actor and the pipeline a supervisor but I'm struggling to understand the mechanics of passing the messages around and the techniques to direct messages on to the next logical step in the pipeline.

Of course, I could just look at the pipeline as a single unit of work and pool at that level but I feel that is a missed opportunity.

Any suggestions would be gratefully received.

Thanks,
Ben

AJ Christensen

unread,
Mar 5, 2013, 2:57:48 PM3/5/13
to cellulo...@googlegroups.com
Hey Ben,

Have you seen Batsir? [0]

I think it could be a nice fit for the pattern you're describing and
at the very least could offer some implementation details around how
they've built stage-based pipelines.

Cheers,

AJ

[0] https://github.com/jwkoelewijn/batsir
> --
> You received this message because you are subscribed to the Google Groups
> "Celluloid" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to celluloid-rub...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.
>
>

Ben Lovell

unread,
Mar 5, 2013, 3:08:02 PM3/5/13
to cellulo...@googlegroups.com
On 5 March 2013 19:57, AJ Christensen <a...@junglist.gen.nz> wrote:
Hey Ben,

Have you seen Batsir? [0]

I think it could be a nice fit for the pattern you're describing and
at the very least could offer some implementation details around how
they've built stage-based pipelines.

Cheers,

AJ

[0] https://github.com/jwkoelewijn/batsir


zomg! I'm not sure how I missed that when digging around. Anyway, thanks. That should be plenty of inspiration for now.

Cheers,
Ben

Josh Adams

unread,
Mar 5, 2013, 8:24:53 PM3/5/13
to cellulo...@googlegroups.com
Ben,

I've got a FilterChain object in use in (needs to be extracted from) http://github.com/knewter/probably_worth_watching/.  It does some pretty fun stuff w/r/t stream processing and something like pipes/filters.

So it's not explicitly celluloid, but in the examples dir I use it with celluloid to great effect.  Was able to saturate CPU and network across 1000 threads on a tiny laptop (actually managed to use my entire home broadband capacity from a single MRI process because of the non-blocking i/o bits in the GVL which was a nice surprise).

No idea if it's relevant to your project but it's certainly fun for me to work on and I hope to extract the filter_chain bit out of it to another gem.

Ben Lovell

unread,
Mar 6, 2013, 3:33:29 AM3/6/13
to cellulo...@googlegroups.com
On 6 March 2013 01:24, Josh Adams <jo...@isotope11.com> wrote:
Ben,

I've got a FilterChain object in use in (needs to be extracted from) http://github.com/knewter/probably_worth_watching/.  It does some pretty fun stuff w/r/t stream processing and something like pipes/filters.

So it's not explicitly celluloid, but in the examples dir I use it with celluloid to great effect.  Was able to saturate CPU and network across 1000 threads on a tiny laptop (actually managed to use my entire home broadband capacity from a single MRI process because of the non-blocking i/o bits in the GVL which was a nice surprise).

No idea if it's relevant to your project but it's certainly fun for me to work on and I hope to extract the filter_chain bit out of it to another gem.


Hey Josh,

Thanks, that looks great. Just digging through now. I have a nice implementation of pipes and filters running synchronously at the moment but can definitely see the benefit of a celluloid implementation with buffering pipes and async filters. I'm hoping to extract as much as I can once I've got something running.

Cheers,
Ben

Ben Lovell

unread,
Mar 6, 2013, 4:46:23 AM3/6/13
to cellulo...@googlegroups.com
On 6 March 2013 08:33, Ben Lovell <benjami...@gmail.com> wrote:
On 6 March 2013 01:24, Josh Adams <jo...@isotope11.com> wrote:
Ben,

I've got a FilterChain object in use in (needs to be extracted from) http://github.com/knewter/probably_worth_watching/.  It does some pretty fun stuff w/r/t stream processing and something like pipes/filters.

So it's not explicitly celluloid, but in the examples dir I use it with celluloid to great effect.  Was able to saturate CPU and network across 1000 threads on a tiny laptop (actually managed to use my entire home broadband capacity from a single MRI process because of the non-blocking i/o bits in the GVL which was a nice surprise).

No idea if it's relevant to your project but it's certainly fun for me to work on and I hope to extract the filter_chain bit out of it to another gem.


Hey Josh,

Thanks, that looks great. Just digging through now. I have a nice implementation of pipes and filters running synchronously at the moment but can definitely see the benefit of a celluloid implementation with buffering pipes and async filters. I'm hoping to extract as much as I can once I've got something running.

Cheers,
Ben

So both those suggested implementations (thanks AJ & Josh) are nice but still treat the execution of the complete chain as a unit of work rather than executing the filters asynchronously and buffering between filters. 

I'm still struggling to find a sane way of buffering between filters without mind-bending pub/sub which doesn't feel particularly smart. As I understand the mailbox, this isn't really a published API as such, and shouldn't read/write to outside of an actor's normal duties. Can anyone give me a quick jump on how to approach this? 

I'm just now putting together an example that passes a future to the next filter in the chain to see how that works out...

Tony Arcieri

unread,
Mar 6, 2013, 1:04:11 PM3/6/13
to cellulo...@googlegroups.com
On Wed, Mar 6, 2013 at 1:46 AM, Ben Lovell <benjami...@gmail.com> wrote:
I'm just now putting together an example that passes a future to the next filter in the chain to see how that works out...

That's probably what you should be doing 

--
Tony Arcieri

Ben Lovell

unread,
Mar 6, 2013, 1:43:11 PM3/6/13
to cellulo...@googlegroups.com
Hey Tony,

So I hacked together an example using futures passed by the pipeline to the next filter but just ran into the same problems. Namely the next filter always requires the result of the filter prior, so blocking on the value before the next filter can do its work kinda negates the whole approach. I have one final question before I throw in the towel and stick to simple pools of workers executing whole pipelines and their filter chains... Using celluloid primitives, how best could I model queues between the filters? As I said previously I imagine it to be mailboxes at first glance, but I wonder whether I'd be better served by actors masquerading as queues between the filters (also actors)?

I suppose I'll have to give this approach a try also :)

Tony Arcieri

unread,
Mar 6, 2013, 4:29:06 PM3/6/13
to cellulo...@googlegroups.com
You can also use asynchronous messages to pass work between steps. The mailbox acts as a queue
--
Tony Arcieri

Ben Lovell

unread,
Mar 7, 2013, 4:27:33 AM3/7/13
to cellulo...@googlegroups.com
On 6 March 2013 21:29, Tony Arcieri <tony.a...@gmail.com> wrote:
You can also use asynchronous messages to pass work between steps. The mailbox acts as a queue

The issue here is that the next filter depends on the prior filter's work being completed on the message. The only sane approach I can see is sharing queues between "dependent" filters. The prior filter will do its thang and push the mutated message onto its outward queue... The next filter will read, process, write to its outward queue until its inward queue is empty which will perform a blocking read on the queue (or some callback, whatever) until more work becomes available. Regardless of my experiments I've seen that due to the nature of work among the majority of my filters (teeny computation), it seems there is more overhead in the mechanics of this approach than there would be in the filters themselves. 

Regardless, I think I have a nice implementation of buffered pipes & filters so I'll extract something and push it up soon for the heck of it.

Thanks for the help, chaps.

Ben
Reply all
Reply to author
Forward
0 new messages