Questions regarding multi-threading in RabbitMQ Java Spring client

415 views
Skip to first unread message

Yonatan Alon

unread,
Apr 2, 2020, 11:24:05 AM4/2/20
to rabbitmq-users

I've got a few short questions about RabbitMQ + Spring , and i'd appreciate your help on this matter.

After reading the documentation
here (https://docs.spring.io/spring-amqp/reference/html/#connections)
and here (https://www.rabbitmq.com/api-guide.html)

it seems the api for the Java + Spring client has a class called CachingConnectionFactory.

The facts:

  1. CachingConnectionFactory has two members of intrest: PublisherConnectionFactory and RabbitConnectionFactory
  2. CachingConnectionFactory extends a parent class called AbstractConnectionFactory
  3. CachingConnectionFactory has a member called (ExecutorService channelsExecutor).
  4. AbstractConnectionFactory has a member called (ExecutorService executorService).
  5. AbstractConnectionFactory has a method called setExecutor(ExecutorService executorService), which sets it's own executorService and then invokes the same method on it's PublisherConnectionFactory
  6. RabbitConnectionFactory has 4 (!) different executors: sharedExecutor, shutdownExecutor, heartbeatExecutor, topologyRecoveryExecutor; It also has a member called nioParams, with another 2 executors: nioExecutor and connectionShutdownExecutor. In addition, it has a method called params, accepting another executor called consumerWorkServiceExecutor. That is 7 executors in total (!!)
  7. It seems the RabbitConnectionFactory, and the docs here (https://www.rabbitmq.com/api-guide.html again), suggest you can pass in any custom executor when creating a channel: rabbitConnectionFactory.newConnection(myExecutorService);

The questions:

  1. What is the difference, and the relationship, between CachingConnectionFactory, PublisherConnectionFactory and RabbitConnectionFactory?
  2. Why do we need so many different executor pools? What is the difference between them all?
  3. Do these classes share their thread pools / executors?
  4. When a connection is opened, to which thread pool is it allocated?
  5. When a channel is opened, to which thread pool is it allocated?
  6. Do publish and consume operations use different thread pools?
  7. How do I pass in my own executor implementation or otherwise configure, each of the aformentioned executors?
  8. Would it be a good idea to merge/unify some or all of the executors?

Thanks in advance

Yonatan Alon

unread,
Apr 2, 2020, 11:28:41 AM4/2/20
to rabbitmq-users
Our technology stack:
Java 11
Spring 5.2.5
Spring boot 2.2.5
Spring AMQP 2.2.5
RabbitMQ 3.8.3

Yonatan Alon

unread,
Apr 2, 2020, 11:31:05 AM4/2/20
to rabbitmq-users
These questions were raised as I was starting to work on a new project for my company
Following some bas experience with resource and thread management in one of our previous products,
I'm looking for the answers for my personal knowledge, hoping to save my team some trouble in the near future.



On Thursday, 2 April 2020 18:24:05 UTC+3, Yonatan Alon wrote:

Luke Bakken

unread,
Apr 2, 2020, 11:31:19 AM4/2/20
to rabbitmq-users
Hi Yonatan -


I'll add some additional information from our conversation in Slack (https://rabbitmq.slack.com/archives/C1EDN83PA/p1585838030114500):

Using:

Java 11
Spring 5.2.5
Spring boot 2.2.5
Spring AMQP 2.2.5
RabbitMQ 3.8.3

Some details about the use-case:

Our product might be deployed in sensitive environments and we might be tight on resources. We intend to offer several api's (http, amqp, bash cli etc...), and we are trying to control our resources as much as we can. This is after some bad experience we had with one of our previous products

Luke

Gary Russell

unread,
Apr 2, 2020, 12:09:50 PM4/2/20
to rabbitm...@googlegroups.com
I can answer the Spring Questions

What is the difference, and the relationship, between CachingConnectionFactory, PublisherConnectionFactory and RabbitConnectionFactory?

CCF is the main factory; by default. its single connection is shared by all components (but it has a cache mode supporting multiple connections).
PCF is an optional CCF that can be used for publishing (when RabbitTemplate.usePublisherConnection is true). This is recommended to allow consumers to continue to consume when publishers are blocked.

The CCF executorService (if present) is passed into RCF.newConnection().
The channelsExecutor is an internal executor that is only used if publisher confirms are enabled; it is used the defer a channel close request until the publisher confirm arrives (or a timeout).

Why do we need so many different executor pools?
What is the difference between them all?
Do these classes share their thread pools / executors?

The CCF executorService (if present) is passed into RCF.newConnection().

When a connection is opened, to which thread pool is it allocated?

The CCF executorService (if present) is passed into RCF.newConnection(). Otherwise the amqp-client uses its own executor.

When a channel is opened, to which thread pool is it allocated?
Do publish and consume operations use different thread pools?

If RabbitTemplate.usePublisherConnection is true.

How do I pass in my own executor implementation or otherwise configure, each of the aformentioned executors?

There are setter methods for the two CCF executors.

Would it be a good idea to merge/unify some or all of the executors?
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/06e81bd4-4fc7-4f61-aecb-f0937e3c5915%40googlegroups.com.

Arnaud Cogoluègnes

unread,
Apr 3, 2020, 7:56:02 AM4/3/20
to rabbitm...@googlegroups.com
I can answer the Java client questions.

>
> What is the difference, and the relationship, between CachingConnectionFactory, PublisherConnectionFactory and RabbitConnectionFactory?
> Why do we need so many different executor pools? What is the difference between them all?

There are different executor services for different purposes. The
Javadoc [1] documents their differences, but here is some additional
information:

- sharedExecutor: used to dispatch messages in consumers.
Connections created by the same connection factory will share it.
- shutdownExecutor: it is used in the AMQP closing sequence, see [2]
[3] for more details.
- heartbeat executor: the executor used by connections to schedule
heartbeat frames.
- topology recovery executor: to parallelize topology recovery (the
default is to not parallelize). This was a community contribution, see
[4].
- nioExecutor: the executor used for NIO loop (reading/writing) (the
thread factory is used if it is not set)
- nio shutdown executor: for internals (avoiding a deadlock when
closing the connection under specific circumstances) (the thread
factory is used if it is not set)
- params(consumerWorkServiceExecutor): this is the sharedExecutor.
I'm not the original author of this code, I don't know why this method
is public, maybe for testing, it's not meant to be used by application
developers.

All these settings have reasonable defaults, that work for most cases.
Developers can tune the client threading behavior by setting their own
executors. See [5] and [6] to see how our performance testing tool
makes use of these parameters to create thousands of connections with
a limited number of threads.

> Do these classes share their thread pools / executors?
> When a connection is opened, to which thread pool is it allocated?

If using blocking IO (the default), the thread factory is used to
create the thread that will read frames. If using NIO, the connection
IO operations happen in a thread allocated by the nioExecutor if it is
set or by the thread factory if the executor is not set.

> When a channel is opened, to which thread pool is it allocated?

Channels are not allocated to any pool. Inbound messages for a
channel's consumers are dispatched in the shared executor.

> Do publish and consume operations use different thread pools?

When using IO, the socket write from Channel#basicPublish happens in
the caller's thread, with NIO, it happens in the thread used for the
connection's IO.

Inbound messages are dispatched in the shared executors (for
asynchronous consumers registered with basicConsume, not basicGet).

> How do I pass in my own executor implementation or otherwise configure, each of the aformentioned executors?

When using the Java client directly, they can be passed in as
parameters of the setters or other methods.

> Would it be a good idea to merge/unify some or all of the executors?
>

You can use the same executor instance in different places, but you'll
have to make sure they're sized correctly depending on the connections
and applications needs to avoid too much competition. Otherwise
there's no real reason to merge the executors, because they all serve
different purposes.

[1] https://rabbitmq.github.io/rabbitmq-java-client/api/current/
[2] https://github.com/rabbitmq/rabbitmq-java-client/issues/91
[3] https://github.com/rabbitmq/rabbitmq-java-client/issues/194
[4] https://github.com/rabbitmq/rabbitmq-java-client/pull/370
[5] https://rabbitmq.github.io/rabbitmq-perf-test/stable/htmlsingle/#simulating-high-loads
[6] https://rabbitmq.github.io/rabbitmq-perf-test/stable/htmlsingle/#workloads-with-a-large-number-of-clients

Yonatan Alon

unread,
Apr 6, 2020, 9:05:01 AM4/6/20
to rabbitmq-users
Garry and Arnaud, thank you very much for your swift replies!

One last question: In scenarios of high throughput (thoudands of connections and more), how much load can each of the executors expect to handle?
(i.e. I'd expect the shutdownExecutor to handle a thread or two, and the sharedExecutor to handle many threads. Is my assumption correct?)


On Thursday, 2 April 2020 18:24:05 UTC+3, Yonatan Alon wrote:
Reply all
Reply to author
Forward
0 new messages