The results in RPC result queue are shown as unacknowledged in RabbitMQ after Result.get() is called on them

298 views
Skip to first unread message

Hulu

unread,
Jan 24, 2020, 3:44:43 PM1/24/20
to celery-users
Hi,

I have created a very simple test program to test the functionality of Celery.  It connects to RabbitMQ as the broker and uses RPC for the backend.  In a loop, I call longtime_divide task and check for the status of the AsyncResult.  When it is ready, I call Result.get().  I found that after Result.get() is called, the results remain as unacknowledged in the result queue.  Based on the documentation from Celery, calling the Result.get() should automatically acknowledge the result.  https://celery.readthedocs.io/en/latest/reference/celery.result.html#celery.result.AsyncResult
Please find the source code and config for my test program below.  Have I missed some settings or is it a bug with the RPC backend?  Thank you!

run_tasks.py
-------------------------------------------

from .tasks import longtime_add
from .tasks import longtime_divide
import time
from random import randrange

if __name__ == '__main__':
    resultList = []
    resultReadCount=0
    for i in range(1, 100):
        i1=randrange(0, 1000)

        result = longtime_divide.delay(i1,i)
        resultList.append(result)
        time.sleep(2)
        print('Iteration =>'+str(i))
        for r in resultList:
            if r.ready() == True:
                print('Task =>' + str(r))
                print('Task result: '+ str(r.get()))
                resultList.remove(r)
                resultReadCount+=1
                print('Read Result =>' + str(resultReadCount))

    while len(resultList) > 0:
        print('Pending Count =>'+str(len(resultList)))
        for r in resultList:
            if r.ready() == True:
                print('Task =>' + str(r))
                print('Task result: '+ str(r.get()))
                resultList.remove(r)
                resultReadCount+=1
                print('Read Result =>' + str(resultReadCount))
        time.sleep(2)




celeryconfig.py
-------------------------------------------
broker_url = 'pyamqp://tester:12345@vnn3222:5672/abc'
result_backend = 'rpc://'
broker_transport_options = {'confirm_publish': True}
task_acks_late=True
task_reject_on_worker_lost=True
task_acks_on_failure_or_timeout=False


celery.py
-------------------------------------------
from __future__ import absolute_import
from celery import Celery
from kombu import Exchange, Queue
from celery.exceptions import Reject

test_queue_name = 'test'
test_exchange_name = 'test'
test_routing_key = 'test.key'


app = Celery('test_celery',
             include=['test_celery.tasks'])
app.config_from_object('test_celery.celeryconfig')

test_exchange = Exchange(test_exchange_name, type='direct')
test_queue = Queue(
    test_queue_name,
    test_exchange,
    routing_key=test_routing_key)

app.conf.task_queues = (test_queue, )

app.conf.task_default_queue = test_queue_name
app.conf.task_default_exchange = test_exchange_name
app.conf.task_default_routing_key = test_routing_key


tasks.py
-------------------------------------------------------------------
from __future__ import absolute_import
from test_celery.celery import app
from celery.exceptions import Reject
import time

@app.task(bind=True, default_retry_delay=300, max_retries=5)
def longtime_divide(self, x, y):
    print('long time task begins')
    # sleep 10 seconds
    time.sleep(10)
    print('long time task finished')
    try:
        return x / y
    except ZeroDivisionError as exc:
        raise Reject(exc, requeue=False)


Ing. Josue Balandrano Coronel

unread,
Jan 25, 2020, 8:51:28 AM1/25/20
to celery...@googlegroups.com
I'm curious what the output of your code is?
Also, Celery doesn't set the message as acknowledge when retrieving the result from RPC per-se, what happens is part of the AMQP protocol. Whenever you ready a message from an AMQP Queue it gets acknowledged and it didappears. Reading anything from the queue is a destructive operation. I think maybe you are looking in the wrong direction or there's something else in your RabbitMQ config.

--
You received this message because you are subscribed to the Google Groups "celery-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to celery-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/celery-users/92c0e84f-ceee-4c2c-b2b1-81e09537d748%40googlegroups.com.

Hulu

unread,
Jan 27, 2020, 11:13:43 AM1/27/20
to celery-users
Hi,

Thanks for looking into it.  

Please find the output of my test code below.  In Pika, when calling Chanal.basic_consume(queue, on_message_callback, auto_ack=False, exclusive=False, consumer_tag=None, arguments=None, callback=None)  ( https://pika.readthedocs.io/en/stable/modules/channel.html),  there is an auto_ack parameter to indicate whether to automatically acknowledge the message.  If it is set to False, Chanel.basic_ack(delivery_tag=0, multiple=False) needs to be called after successfully consumed the message to remove the message completely from the queue.  From what I have observed, calling the Result.get() corresponding to the Chanal.basic_consume() with auto_ack=False.  but I couldn't find the corresponding call for Chanel.basic_ack() in Celery.  I have set the following configuration for the task queues.  Is it possible that Celery applied these configurations to the queue created for RPC results?
--------------------------------------------------------------
task_acks_late=True
task_reject_on_worker_lost=True
task_acks_on_failure_or_timeout=False
------------------------------------------------------------------


----------------------------
(python37) [test@test ~]$  python -m test_celery.run_tasks
Iteration =>1
Iteration =>2
Iteration =>3
Iteration =>4
Iteration =>5
Task =>80eac154-c80b-443f-9e45-ebf7f5775c3c
Task result: 157.0
Iteration =>6
Iteration =>7
Iteration =>8
Iteration =>9
Iteration =>10
Task =>59cddae1-904f-45a1-92f1-4104065a89bf
Task result: 493.5
Iteration =>11
Iteration =>12
Iteration =>13
Iteration =>14
Iteration =>15
Task =>7f2715c7-31aa-42ba-adb2-3f8e10b034de
Task result: 12.333333333333334
Iteration =>16
Iteration =>17
Iteration =>18
Iteration =>19
Iteration =>20
Task =>ba41ffc5-cc1e-4667-98fb-2568d9599535
Task result: 93.25
......

------------------------------------------------------------------------------




On Saturday, 25 January 2020 06:51:28 UTC-7, Ing. Josue Balandrano Coronel wrote:
I'm curious what the output of your code is?
Also, Celery doesn't set the message as acknowledge when retrieving the result from RPC per-se, what happens is part of the AMQP protocol. Whenever you ready a message from an AMQP Queue it gets acknowledged and it didappears. Reading anything from the queue is a destructive operation. I think maybe you are looking in the wrong direction or there's something else in your RabbitMQ config.

To unsubscribe from this group and stop receiving emails from it, send an email to celery...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages