Setting prefetch limit for artemis queue consumers

389 views
Skip to first unread message

Martin Czakó

unread,
Mar 16, 2024, 12:56:05 PM3/16/24
to WildFly
Hello,

I have an application running on Wildfly 26.1.3 and I need to set up flow control to limit the maximum number of messages my application can consume at a time. The reason for this is that my application runs in multiple replicas on a k8s cluster and the load of messages is not evenly distributed between them. One replica prefetches almost all messages, the messages stay in Delivering state and the other replicas are not doing anything because of this. This is not really good performance wise.

From what I could find this is called prefetch but all my attempts trying it failed. Can anybody help me with setting this up? Or if there's an even better solution for my case then pointing me to it?

My wildfly configuration for defining the connection factory and queues references in the external artemis broker I'm connecting to:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:13.1">
<server name="default">
<security elytron-domain="ApplicationDomain" />
<statistics
enabled="${wildfly.messaging-activemq.statistics-enabled:${wildfly.statistics-enabled:false}}" />
<security-setting name="#">
<role name="guest" send="true" consume="true" create-non-durable-queue="true"
delete-non-durable-queue="true" />
</security-setting>
<address-setting name="#" dead-letter-address="jms.queue.DLQ"
expiry-address="jms.queue.ExpiryQueue" max-size-bytes="10485760" page-size-bytes="2097152"
message-counter-history-day-limit="10" />
<http-connector name="http-connector" socket-binding="http" endpoint="http-acceptor" />
<http-connector name="http-connector-throughput" socket-binding="http"
endpoint="http-acceptor-throughput">
<param name="batch-delay" value="50" />
</http-connector>
<in-vm-connector name="in-vm" server-id="0">
<param name="buffer-pooling" value="false" />
</in-vm-connector>
<http-acceptor name="http-acceptor" http-listener="default" />
<http-acceptor name="http-acceptor-throughput" http-listener="default">
<param name="batch-delay" value="50" />
<param name="direct-deliver" value="false" />
</http-acceptor>
<in-vm-acceptor name="in-vm" server-id="0">
<param name="buffer-pooling" value="false" />
</in-vm-acceptor>
<jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue" />
<jms-queue name="DLQ" entries="java:/jms/queue/DLQ" />
<jms-queue name="t24EXECQueue"
entries="queue/t24EXECQueue java:jboss/exported/jms/queue/t24EXECQueue" />
<jms-topic name="tecEventsTopic"
entries="topic/tecEventsTopic java:jboss/exported/jms/topic/tecEventsTopic" />
<!-- jms-topic name="t24ManagementTopic" entries="topic/t24ManagementTopic
java:jboss/exported/jms/topic/t24ManagementTopic"/ -->
<connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory"
connectors="in-vm" />
<connection-factory name="RemoteConnectionFactory"
entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector" />
<pooled-connection-factory name="activemq-ra"
entries="java:/JmsXA_IGNORE java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"
transaction="xa" />

<remote-connector name="remote-artemis-master" socket-binding="remote-artemis-master">
<param name="ssl-enabled" value="true" />
<param name="trust-store-password" value="${JMS_TRUSTSTORE_PASSWORD}" />
<param name="trust-store-path" value="${JMS_TRUSTSTORE_PATH}" />
<param name="key-store-password" value="${JMS_KEYSTORE_PASSWORD}" />
<param name="key-store-path" value="${JMS_KEYSTORE_PATH}" />
</remote-connector>
<remote-connector name="remote-artemis-slave" socket-binding="remote-artemis-slave">
<param name="ssl-enabled" value="true" />
<param name="trust-store-password" value="${JMS_TRUSTSTORE_PASSWORD}" />
<param name="trust-store-path" value="${JMS_TRUSTSTORE_PATH}" />
<param name="key-store-password" value="${JMS_KEYSTORE_PASSWORD}" />
<param name="key-store-path" value="${JMS_KEYSTORE_PATH}" />
</remote-connector>
<pooled-connection-factory
name="AMQ"
entries="java:/JmsXA java:jboss/JmsXA"
connectors="remote-artemis-master remote-artemis-slave"
ha="true"
min-pool-size="10"
max-pool-size="20"
min-large-message-size="1048576"
compress-large-messages="true"
reconnect-attempts="-1"
thread-pool-max-size="-1">
<inbound-config rebalance-connections="true" setup-attempts="-1" setup-interval="1000" />
</pooled-connection-factory>
</server>
</subsystem>
<subsystem xmlns="urn:jboss:domain:naming:2.0">
<bindings>
<external-context name="java:global/remoteContext" module="org.apache.activemq.artemis"
class="javax.naming.InitialContext" cache="true">
<environment>
<property name="java.naming.factory.initial" value="org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory" />
<property name="java.naming.provider.url" value="(tcp://nn74x045.sos.kb.cz:61716,tcp://nv74x049.sos.kb.cz:61716)" />
<property name="queue.myRequestQueue" value="myRequestQueue" />
<property name="queue.myReplyQueue" value="myReplyQueue" />
</environment>
</external-context>
<lookup name="java:/queue/myRequestQueue" lookup="java:global/remoteContext/myRequestQueue" />
<lookup name="java:/queue/myReplyQueue" lookup="java:global/remoteContext/myReplyQueue" />
</bindings>
<remote-naming />
</subsystem>

Bartosz Baranowski

unread,
Mar 18, 2024, 3:29:21 AM3/18/24
to WildFly
From what I understand prefetch  should stop flow of messages until client sends back ACK that 50% were processed:
https://activemq.apache.org/components/classic/documentation/what-is-the-prefetch-limit-for

Once the broker has dispatched a prefetch limit number of messages to a consumer it will not dispatch any more messages to that consumer until the consumer has acknowledged at least 50% of the prefetched messages, e.g., prefetch/2, that it received. When the broker has received said acknowledgements it will dispatch a further prefetch/2 number of messages to the consumer to ‘top-up’, as it were, its prefetch buffer. Note that it’s possible to specify a prefetch limit on a per consumer basis (see below).



1. can you validate that you alter this setting? AFAIR default value is ridiculous( or it was)
2. can you check state of other replicas? if they dont receive any, it might either mean #1 or they are isolated somehow(essentially bot up, but some part of conf make them inaccessible or something similar)?
3. did you try something like: https://stackoverflow.com/questions/64065566/activemq-artemis-queue-does-not-distribute-messages-evenly-to-the-stomp-consumer

Reference: https://activemq.apache.org/components/artemis/documentation/latest/flow-control.html
Reply all
Reply to author
Forward
0 new messages