RabbitMQ / Pika issue with basic_ack() function

555 views
Skip to first unread message

thibm...@gmail.com

unread,
Jan 28, 2019, 11:52:53 AM1/28/19
to rabbitmq-users
Hello,

First of all, sorry if this group is not the right place to post Pika related content, but I'm not sure the source of my problem is either from Pika, RabbitMQ or my system.

A bit of context for my problem (summary + code in the end):

I had a problem in production first and managed to reproduce it in preproduction.
  • I have 4 machines running the publisher and sending JSON files to RabbitMQ with 2 headers (machine,cluster)
  • RabbitMQ is running on a machine with 
    • 2 CPUs / 1 GB RAM in preproduction
    • 8 CPUs / 3 GB RAM in production
  • Consumers are running with threads and inserting data in a tsdb.
    • with 24GB RAM and 4 CPUs in preproduction -> 4 threads
    • with 32GB RAM and 8 CPUs in production -> 8 threads
  • Data to insert 
    • Preproduction : 75k messages, ~3 Gigs
    • Production : 1800 messages, ~2 Gigs
An other side note from the consumer : The body of the message is written on disk in a temp file for performances issues. After the consumption of the message, the temp file is deleted.

My problem is : At some point, after the consumption of the messages, acknowledge messages are not sent to RabbitMQ anymore: 
  • Consumption of the message works well, fetching headers works, and callback function which inserts the content in databases is also OK)
  • If the consumption is successful, the code calls the Pika function "basic_ack(basic_deliver.delivery_tag)"
  • The function doesn't work and the thread crashes.
  • I tried catching the exception to prevent the thread from crashing, and continue the script to delete the temp file, but the exception returned is blank.
The reconsumption of the message is not problematic because the data is inserted in a TSDB with a timestamp declared in the body of the message. 
It also seems that once a message fails to be acknowledged, it can never be acknowledged again, so when the RabbitMQ queue was full of "un-acknowledgable" messages, the consumers kept working without acknowledging a single message to RabbitMQ.
I still had around 4000 messages shown as "unacked" in the graphs, and the queue size kept going up as the publishers were still publishing, and because the consumers would seem to loop on messages that cannot be acknowledged. 

Is it a common problem with these kinds of heavy loads of messages ? Do you have any experience or tips with this problem ? 

To summarize : When under heavy flow of messages (at least 500 messages / 500M of data), the basic_ack() function called in my consumer python script crashes. Catching the exception doesn't show anything and RabbitMQ logs are silent.

Thanks for your help

Thib

Here is my code for the pika and consuming/acknowledging part : 

Pika :
    def init(self, config, ssl=False, exclusive=False):
        
import pika
        key 
= 'my-key'
        host 
= config.get(key, 'host')
        port 
= int(config.get(key, 'port'))
        vhost 
= config.get(key, 'vhost')
        queue 
= config.get(key, 'queue')
        username 
= config.get(key, 'username')
        password 
= config.get(key, 'password')


        credentials 
= pika.PlainCredentials(username, password)
        parameters 
= pika.ConnectionParameters(credentials=credentials, host=host, port=port, virtual_host=vhost, ssl=ssl)
        connection 
= pika.BlockingConnection(parameters)
        channel 
= connection.channel()


        
try:
            channel
.queue_declare(queue=queue, passive=True)
        
except pika.exceptions.ChannelClosed as e:
            code
, value = e.args
            
if (code >= 400):
                logger
.error("queue '%s' was not declared in vhost '%s' of AMQP server", queue, vhost)
                sys
.exit(1)


        
self.exclusive = exclusive
        
self.connection = connection
        
self.channel = channel
        
self.queue = queue
        
self.properties = pika.BasicProperties(delivery_mode=2)   # make message persistent

Consumer

    def consume(self, command, metadata, tempfile, tempdir, nack):
        logger
.info('start consuming...')
        
for basic_deliver, properties, body in self.channel.consume(self.queue, exclusive=self.exclusive):
            logger
.debug('message received: timestamp=%s, message_id=%s, correlation_id=%s',
                          properties
.timestamp,
                          properties
.message_id,
                          properties
.correlation_id)


            
if tempfile:
                
if not os.path.isdir(tempdir):
                    logger
.error('temporary directory %s does not exist', tempdir)
                    sys
.exit(1)
                filepath 
= os.path.join(tempdir, str(randrange(0, 1 << 32)))
                
with os.fdopen(os.open(filepath, os.O_WRONLY | os.O_CREAT, 0600), 'w') as f:
                    f
.write(body)
                    body 
= filepath


            ok 
= self.consume_message(body, properties, command, metadata)
            if ok and not nack:
                try:
                    self.channel.basic_ack(basic_deliver.delivery_tag)
                    logger.debug('message was acknowledged')
                except:
                    logger.debug('message failed to ack')
                    //EXCEPTION MESSAGE IS NULL
            else:
                self.channel.basic_nack(basic_deliver.delivery_tag)
                logger.error('message was rejected')

            if tempfile:
                os
.remove(body)

        logger
.info('end consuming')

Luke Bakken

unread,
Jan 28, 2019, 8:56:42 PM1/28/19
to rabbitmq-users
Hi Thib,

What version of RabbitMQ, Erlang and Pika are you using?

You'll have to share all of your code, or code that reproduces the same issue. for us to try to diagnose what is happening. Your consumer code does not show where the connection or channel objects are created, nor does it show how a thread could be "crashing". You say the returned exception is blank but it would be informative to know what type of exception it is.

When this exception is thrown, what happens in your script? Does it exit? What happens to the connection you had open to RabbitMQ?

Thanks,
Luke

thibm...@gmail.com

unread,
Jan 29, 2019, 11:33:42 AM1/29/19
to rabbitmq-users
Hi Luke, 

Thanks for your answer,

What version of RabbitMQ, Erlang and Pika are you using?

RabbitMQ version : 3.6.6-1 ->stable for debian stretch 9.7
Erlang version : 19.2.1  -> stable for debian stretch 9.7
Pika : 0.9.14-1 -> stable for a jessie 8.11 dist

it would be informative to know what type of exception it is

I will be able to tell you more about this when I will reproduce the problem again. 
 
When this exception is thrown, what happens in your script? Does it exit? What happens to the connection you had open to RabbitMQ?

Before I started catching the exception thrown, the script was crashing and then the parent bash script monitoring the status of its childrens (the consumers) would launch a new consumer and consume new messages. 
Catching the exception gives me the ability to log the problem and move on to the temp file removal.  

I tried to reproduce the problem again today in preproduction, here with ~3k message and 300Megs of messages to insert, and I didn't have a single problem, every message has been acknowledged (the previous test resulted in around 40% of the messages never being acknowledged.) 
IMO, our setup for RabbitMQ needs more RAM allocated and this could be the reason why the consuming went crazy.

I will run more tests based on how much data I insert and how much RAM I give to my RabbitMQ container. I'll make sure to update this thread for feedback or ask for help again if this is not RAM related :)

Thanks again,

Thib.



Michael Klishin

unread,
Jan 29, 2019, 12:07:56 PM1/29/19
to rabbitm...@googlegroups.com
Do you have evidence [1] to back up the RAM claim? If you don't I would try to collect some. It is quite common
to see applications that do not use client libraries in an optimal way (e.g. [2]), or cannot handle load e.g. with automatic acknowledgements,
being blamed on RabbitMQ.

A slowed down consumer in automatic acknowledgement mode can be overwhelmed by rapid deliveries, suffer from concurrency hazards [3],
and so on, and none of those things can be addressed by changing anything on the server end.

Please collect evidence, focus on metrics for your applications, use PerfTest to compare your findings [4] and
perhaps take a traffic dump [5] when you find a way to reproduce.


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Thib D

unread,
Jan 31, 2019, 9:04:28 AM1/31/19
to rabbitm...@googlegroups.com
Thanks for your help,

I have identified and fixed the problem by adding a basicQos parameter from pika telling the consumers to prefetch fewer messages.
The basic_ack() was not working simply because the channel had timed out with RabbitMQ.
Some misconfiguration in the logging conf did not show me there was timeouts/disconnections in the first place. 

Thib

Luke Bakken

unread,
Jan 31, 2019, 10:39:36 AM1/31/19
to rabbitmq-users
Hi Thib,

We're glad you've resolved your issue. Thank you very much for reporting back.

Luke


On Thursday, January 31, 2019 at 6:04:28 AM UTC-8, Thib D wrote:
Thanks for your help,

I have identified and fixed the problem by adding a basicQos parameter from pika telling the consumers to prefetch fewer messages.
The basic_ack() was not working simply because the channel had timed out with RabbitMQ.
Some misconfiguration in the logging conf did not show me there was timeouts/disconnections in the first place. 

Thib


Reply all
Reply to author
Forward
0 new messages