Hi,
I have created a very simple test program to test the functionality of Celery. It connects to RabbitMQ as the broker and uses RPC for the backend. In a loop, I call longtime_divide task and check for the status of the AsyncResult. When it is ready, I call Result.get(). I found that after Result.get() is called, the results remain as unacknowledged in the result queue. Based on the documentation from Celery, calling the Result.get() should automatically acknowledge the result.
https://celery.readthedocs.io/en/latest/reference/celery.result.html#celery.result.AsyncResultPlease find the source code and config for my test program below. Have I missed some settings or is it a bug with the RPC backend? Thank you!
run_tasks.py
-------------------------------------------
from .tasks import longtime_add
from .tasks import longtime_divide
import time
from random import randrange
if __name__ == '__main__':
resultList = []
resultReadCount=0
for i in range(1, 100):
i1=randrange(0, 1000)
result = longtime_divide.delay(i1,i)
resultList.append(result)
time.sleep(2)
print('Iteration =>'+str(i))
for r in resultList:
if r.ready() == True:
print('Task =>' + str(r))
print('Task result: '+ str(r.get()))
resultList.remove(r)
resultReadCount+=1
print('Read Result =>' + str(resultReadCount))
while len(resultList) > 0:
print('Pending Count =>'+str(len(resultList)))
for r in resultList:
if r.ready() == True:
print('Task =>' + str(r))
print('Task result: '+ str(r.get()))
resultList.remove(r)
resultReadCount+=1
print('Read Result =>' + str(resultReadCount))
time.sleep(2)
celeryconfig.py
-------------------------------------------
broker_url = 'pyamqp://tester:12345@vnn3222:5672/abc'
result_backend = 'rpc://'
broker_transport_options = {'confirm_publish': True}
task_acks_late=True
task_reject_on_worker_lost=True
task_acks_on_failure_or_timeout=False
celery.py
-------------------------------------------
from __future__ import absolute_import
from celery import Celery
from kombu import Exchange, Queue
from celery.exceptions import Reject
test_queue_name = 'test'
test_exchange_name = 'test'
test_routing_key = 'test.key'
app = Celery('test_celery',
include=['test_celery.tasks'])
app.config_from_object('test_celery.celeryconfig')
test_exchange = Exchange(test_exchange_name, type='direct')
test_queue = Queue(
test_queue_name,
test_exchange,
routing_key=test_routing_key)
app.conf.task_queues = (test_queue, )
app.conf.task_default_queue = test_queue_name
app.conf.task_default_exchange = test_exchange_name
app.conf.task_default_routing_key = test_routing_key
tasks.py
-------------------------------------------------------------------
from __future__ import absolute_import
from test_celery.celery import app
from celery.exceptions import Reject
import time
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def longtime_divide(self, x, y):
print('long time task begins')
# sleep 10 seconds
time.sleep(10)
print('long time task finished')
try:
return x / y
except ZeroDivisionError as exc:
raise Reject(exc, requeue=False)