parallelism and performance of the consumer

232 views
Skip to the first unread message

Marc Limotte

unread,
6 Aug 2013, 12:33:0006/08/2013
to clojure-...@googlegroups.com
Hi,

I have a few related questions having to do with parallelism and performance of the consumer:

1. 
When using prefetch of 1 (because I have a worker pool and I want fair-load balancing), will the prefetch setting restrict each consumer to processing 1 message at a time, even if there are multiple threads in the subscriber pool?

I create a channel with prefetch 1:
  (lb/qos ch 1)

Declare a queue and bind using that channel:
  (lq/declare ch queue-name :exclusive false :auto-delete false)
  (lq/bind ch my-q "amq.direct" :routing-key my-q)

And subscribe a consumer:
  (lc/subscribe ch my-q my-handler)

I would expect that each thread in the Executor Pool used by subscribe would fetch 1 message, so that multiple message may be processing on a single node at once.  I'm afraid that it's only processing 1 message per node, though?  What should I expect?

2. 
Looking through the source code for lc/subscribe, looks like it uses a default Executor Pool for the consumers.  Not sure, but I believe the number of threads in the default pool is based on the number of cores in the machine.  Is it possible to impact the pool that is created for this?  In particular, I'd like to create more threads for one of my consumers, because it does some I/O and I believe we would get better throughput with more simultaneous requests.

Tracing through the code, I think I found what I want here, with the "executor" setting given to: com.novemberain.langohr.Connection#init.  Am I on the right track?

3. 
Finally, I have a use-case where nodes in a worker pool have some caching, so I'd like to preferentially route a message to a particular node in a worker pool based on a set of keys.  Are there any examples of the best approach to use for this scenario.  A naive approach might be to create one node per key/hash -- e.g. if I took the keys and hashed them together to come up with a single digit 0 - 9, I could then create 10 workers and statically route each of 0 - 9 to one of those 10.  

But this has a lot of drawbacks: load-balancing, fail over, and dev/ops effort to setup and coordinate.  I haven't totally thought it through, but I think I want some sort of affinity based routing.  I'm hoping there is some example or best practice for how to do this with rabbit.

What I'm using: 
  • Langhor 1.1.0
  • Ubuntu Lucid 
  • java version "1.6.0_27"
    OpenJDK Runtime Environment (IcedTea6 1.12.5) (6b27-1.12.5-0ubuntu0.10.04.1)
    OpenJDK 64-Bit Server VM (build 20.0-b12, mixed mode)

I hope this isn't too much all at once.  But thanks for your help and an excellent client library.

marc
The Climate Corporation

Alexander Kehayias

unread,
27 Dec 2013, 15:04:4827/12/2013
to clojure-...@googlegroups.com
Hi Marc,

Did you get an answer to this question? I'm wondering how to keep multiple consumers on a single machine running long running tasks without blocking each other due to the shared executor pool. I've noticed that I can not consume two queues in parallel if one of the queues ends up getting tied up with long running blocking calls. 

AK

Marc Limotte

unread,
28 Dec 2013, 20:18:4228/12/2013
to clojure-...@googlegroups.com
Hi Alex,

I was able to get this working using a fixed thread pool executor supplied to the connection.  Something like this:

(import java.util.concurrent.Executors)
(mq/connect :executor (Executors/newFixedThreadPool n))

Where n is the number of threads.  You will also want to subscribe n channels.

Hope that helps.  And post if you have other specific questions.

marc

Michael Klishin

unread,
29 Dec 2013, 03:17:1829/12/2013
to clojure-...@googlegroups.com

2013/12/29 Marc Limotte <msli...@gmail.com>

I was able to get this working using a fixed thread pool executor supplied to the connection.  Something like this:

(import java.util.concurrent.Executors)
(mq/connect :executor (Executors/newFixedThreadPool n))

Where n is the number of threads.  You will also want to subscribe n channels.

Using multiple channels is not really necessary. All consumer work pools will share the executor
regardless of the number of channels.
--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin

Marc Limotte

unread,
29 Dec 2013, 15:35:3529/12/2013
to clojure-...@googlegroups.com
I also have prefetch set to 1, as per question 1 in the original email.  As I understand it, a single channel with prefetch 1 will only process 1 message at a time, even if there are multiple threads in the pool.  Message processing in my case is long running and I prefer distribution across nodes over distribution across threads on the same node.

marc

--
You received this message because you are subscribed to a topic in the Google Groups "clojure-rabbitmq" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure-rabbitmq/TxonGAQAr9Q/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure-rabbit...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Alexander Kehayias

unread,
2 Jan 2014, 09:51:5602/01/2014
to clojure-...@googlegroups.com
Is there a built in way to segregate consumer thread pools per channel? I'm trying to solve for a scenario where a single box is consuming from multiple queues, but one of the queues has either long running tasks or a high volume of messages compared to the other queue it's consuming from and would like to process both as equally as possible. 

AK

Michael Klishin

unread,
2 Jan 2014, 11:39:3102/01/2014
to clojure-...@googlegroups.com

On 2 Jan 2014, at 18:51, Alexander Kehayias <alex.k...@gmail.com> wrote:

> Is there a built in way to segregate consumer thread pools per channel?

There is no.
--
MK

Software Engineer, Pivotal/RabbitMQ

Alexander Kehayias

unread,
2 Jan 2014, 11:50:5702/01/2014
to clojure-...@googlegroups.com
As a quick hack, I suppose I could always launch multiple instances of my app on the same box or spin up some new servers.

Is this a feature (independent executor pools per channel) that you think could be generally useful? It would prevent one queue consumer from blocking another due to the shared thread pool being saturated by tasks from a particular queue. 

AK

Michael Klishin

unread,
2 Jan 2014, 11:52:2502/01/2014
to clojure-...@googlegroups.com
On 2 Jan 2014, at 20:50, Alexander Kehayias <alex.k...@gmail.com> wrote:

> As a quick hack, I suppose I could always launch multiple instances of my app on the same box or spin up some new servers.
>
> Is this a feature (independent executor pools per channel) that you think could be generally useful? It would prevent one queue consumer from blocking another due to the shared thread pool being saturated by tasks from a particular queue.

This is how RabbitMQ Java client is designed.

You can dispatch deliveries to other executors if there’s a clear way to group them.

Alexander Kehayias

unread,
2 Jan 2014, 11:55:0702/01/2014
to clojure-...@googlegroups.com
The other way would be to use the “pull” method to get messages from a queue and use clojure agents to handle the dispatching so you could tie a thread to a queue.

Michael Klishin

unread,
2 Jan 2014, 11:57:5602/01/2014
to clojure-...@googlegroups.com

On 2 Jan 2014, at 20:55, Alexander Kehayias <alex.k...@gmail.com> wrote:

> The other way would be to use the “pull” method to get messages from a queue and use clojure agents to handle the dispatching so you could tie a thread to a queue.

You can use agents with send-via [1] or use a dynamically growing thread pool (which is what send-off uses).

Alexander Kehayias

unread,
2 Jan 2014, 12:01:4102/01/2014
to clojure-...@googlegroups.com
Thanks for your help! I may end up doing that since, in my case, I don’t care about the performance loss of having to poll the queue.
Reply all
Reply to author
Forward
0 new messages