Hello All,
Recently while doing some tests, I came across situation where RMQ blocked publication of new messages due to memory threshold reached.
Post this I manually closed connection via management interface and cleared queue in question by purging it.
However even hour after that RMQ still not allowing publishing messages (PFA - Stats reference(s))
At the same time, python client also stopped responding (hanged), finally on closed of connection, it's error-ed out and stopped. How to avoid client blocking ?
It will really helpful to get input(s)/comment(s)/suggestion(s)/pointer(s) to resolve this.
OR Restarting RMQ will be only resolution ?
Thank you.
RMQ Newbie
Details of OS / RMQ version / Sample python code and log entries -
OS - Windows 10
Rabbit MQ version : 3.6.6
Rabbit MQ Python client version : Pika 0.10.0
rabbitMQ config (Modified for test purpose)
[
{ rabbit, [{heartbeat,0}, {loopback_users, []}, {vm_memory_high_watermark, {absolute, "75MB"}},{tcp_listeners, [{"0.0.0.0", 5672},{"::1", 5672}]}]},
{rabbitmq_management, [{listener, [{port, 15672}]}]}
].
Sample python code used for testing -
import pika
import thread
import threading
import sys
import traceback
pika_connection_params = {'host': '127.0.0.1',
'port': 5672,
'virtual_host': 'test',
'socket_timeout': 5
}
# passing credentials as one of the parameter to pika
# inorder to establish the MQ connection
pika_connection_params.update({'credentials': pika.PlainCredentials('guest', 'guest')})
message = 'AAAAAAAAAA'*64000
print "Message Size (KB)", float(sys.getsizeof(message))/1024 # Message Size (KB) 625.020507812
message_properties = {
'content_type': 'application/json',
'delivery_mode': 2
}
def get_blocking_connection():
"""
Get connection
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters(**pika_connection_params)
)
return connection
def close_connection(connection):
"""
Close given connection
"""
connection.close()
def publish_message(connection,get_ack=True):
"""
Sample publish message
"""
start_time = datetime.now()
channel = connection.channel()
if get_ack:
channel.confirm_delivery()
try :
_is_success = channel.basic_publish(
exchange='test',
routing_key='test',
body=message,
properties=pika.BasicProperties(**message_properties)
)
except Exception as error:
print "Error {}".format(error.__repr__())
print 'Error details - {}'.format(traceback.format_exc())
finally:
channel.close()
def multiple_publish():
"""
Test for multiple publish
"""
print "Multiple publish with acknowledgement enabled"
print "*" * 10
for each in range(100):
print each,
conn = get_blocking_connection()
publish_message(conn,False)
close_connection(conn)
if __name__ == '__main__':
multiple_publish()
==============================================
RMQ log entries -
=INFO REPORT==== 1-May-2020::16:59:40 ===
accepting AMQP connection <0.8205.0> (
127.0.0.1:56951 ->
127.0.0.1:5672)
=INFO REPORT==== 1-May-2020::16:59:40 ===
closing AMQP connection <0.8205.0> (
127.0.0.1:56951 ->
127.0.0.1:5672)
=INFO REPORT==== 1-May-2020::16:59:40 ===
accepting AMQP connection <0.8216.0> (
127.0.0.1:56952 ->
127.0.0.1:5672)
=INFO REPORT==== 1-May-2020::16:59:40 ===
vm_memory_high_watermark set. Memory used:115576240 allowed:75000000
=WARNING REPORT==== 1-May-2020::16:59:40 ===
memory resource limit alarm set on node 'RabbitMQ@SYS'.
**********************************************************
*** Publishers will be blocked until this alarm clears ***
**********************************************************
=INFO REPORT==== 1-May-2020::17:52:44 ===
Closing connection <0.8216.0> because "Closed via management plugin"
=ERROR REPORT==== 1-May-2020::17:52:44 ===
Error on AMQP connection <0.8216.0> (
127.0.0.1:56952 ->
127.0.0.1:5672, vhost: 'test', user: 'guest', state: blocked), channel 0:
operation none caused a connection exception connection_forced: "Closed via management plugin"
=INFO REPORT==== 1-May-2020::17:52:44 ===
closing AMQP connection <0.8216.0> (
127.0.0.1:56952 ->
127.0.0.1:5672)