An other side note from the consumer : The body of the message is written on disk in a temp file for performances issues. After the consumption of the message, the temp file is deleted.
My problem is : At some point, after the consumption of the messages, acknowledge messages are not sent to RabbitMQ anymore:
- Consumption of the message works well, fetching headers works, and callback function which inserts the content in databases is also OK)
- If the consumption is successful, the code calls the Pika function "basic_ack(basic_deliver.delivery_tag)"
- The function doesn't work and the thread crashes.
- I tried catching the exception to prevent the thread from crashing, and continue the script to delete the temp file, but the exception returned is blank.
The reconsumption of the message is not problematic because the data is inserted in a TSDB with a timestamp declared in the body of the message.
It also seems that once a message fails to be acknowledged, it can never be acknowledged again, so when the RabbitMQ queue was full of "un-acknowledgable" messages, the consumers kept working without acknowledging a single message to RabbitMQ.
I still had around 4000 messages shown as "unacked" in the graphs, and the queue size kept going up as the publishers were still publishing, and because the consumers would seem to loop on messages that cannot be acknowledged.
Is it a common problem with these kinds of heavy loads of messages ? Do you have any experience or tips with this problem ?
To summarize : When under heavy flow of messages (at least 500 messages / 500M of data), the basic_ack() function called in my consumer python script crashes. Catching the exception doesn't show anything and RabbitMQ logs are silent.
Thanks for your help
Thib
Here is my code for the pika and consuming/acknowledging part :
Pika :
def init(self, config, ssl=False, exclusive=False):
import pika
key = 'my-key'
host = config.get(key, 'host')
port = int(config.get(key, 'port'))
vhost = config.get(key, 'vhost')
queue = config.get(key, 'queue')
username = config.get(key, 'username')
password = config.get(key, 'password')
credentials = pika.PlainCredentials(username, password)
parameters = pika.ConnectionParameters(credentials=credentials, host=host, port=port, virtual_host=vhost, ssl=ssl)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
try:
channel.queue_declare(queue=queue, passive=True)
except pika.exceptions.ChannelClosed as e:
code, value = e.args
if (code >= 400):
logger.error("queue '%s' was not declared in vhost '%s' of AMQP server", queue, vhost)
sys.exit(1)
self.exclusive = exclusive
self.connection = connection
self.channel = channel
self.queue = queue
self.properties = pika.BasicProperties(delivery_mode=2) # make message persistent
Consumer
def consume(self, command, metadata, tempfile, tempdir, nack):
logger.info('start consuming...')
for basic_deliver, properties, body in self.channel.consume(self.queue, exclusive=self.exclusive):
logger.debug('message received: timestamp=%s, message_id=%s, correlation_id=%s',
properties.timestamp,
properties.message_id,
properties.correlation_id)
if tempfile:
if not os.path.isdir(tempdir):
logger.error('temporary directory %s does not exist', tempdir)
sys.exit(1)
filepath = os.path.join(tempdir, str(randrange(0, 1 << 32)))
with os.fdopen(os.open(filepath, os.O_WRONLY | os.O_CREAT, 0600), 'w') as f:
f.write(body)
body = filepath
ok = self.consume_message(body, properties, command, metadata)
if ok and not nack:
try:
self.channel.basic_ack(basic_deliver.delivery_tag)
logger.debug('message was acknowledged')
except:
logger.debug('message failed to ack')
//EXCEPTION MESSAGE IS NULL
else:
self.channel.basic_nack(basic_deliver.delivery_tag)
logger.error('message was rejected')
if tempfile:
os.remove(body)
logger.info('end consuming')