#36219: Django Celery: Kombu connection refused error. OperationalError: [Errno
111] Connection refused
------------------------------+-------------------------------------------
Reporter: Anshul | Type: Bug
Status: new | Component: Error reporting
Version: 4.2 | Severity: Normal
Keywords: celery kombu | Triage Stage: Unreviewed
Has patch: 0 | Needs documentation: 0
Needs tests: 0 | Patch needs improvement: 0
Easy pickings: 0 | UI/UX: 0
------------------------------+-------------------------------------------
It seems a bit odd, maybe I'm missing something, but whenever I send tasks
to celery queue it suddenly gives error:
{{{
AttributeError: 'ChannelPromise' object has no attribute 'value'
}}}
The issue is reproducible when I call the API request twice, which
triggers the async task. The error appears starting from the second
request. However, if I cancel the request in between and send another one
(e.g., using a tool like Postman), the issue is resolved.
Celery Settings:
{{{
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = 'sqs://'
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region' : 'us-south-1', #temp name
'visibility-timeout' : 3600,
'polling-interval' : 10
}
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Kolkata'
}}}
**Packages:
**
{{{
celery = ">=5.0.5"
django = "==4.2.16"
kombu = "==5.4.2"
django-celery-beat = ">=2.0.0"
django-celery-results = ">=2.2.0"
}}}
djangoproject/djangoproject/__init__.py
{{{
from .celery import app as celery_app
__all__ = ['celery_app']
}}}
djangoproject/djangoproject/celery.py
{{{
import os
from celery import Celery
from project.settings import settings
# set the default Django settings module for the 'celery' program.
#settings are kept inside a separate folder for multiple envs
[project/settings/settings_prod.py,project/settings/settings_stag.py,project/settings/settings.py]
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'project.settings.settings')
app = Celery('project')
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
pass
}}}
**Complete error trace:**
{{{
kombu.exceptions.OperationalError: [Errno 111] Connection refused on next
try in production. complete error trace: Traceback (most recent call
last):
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/django/core/handlers/exception.py", line 55, in inner
response = get_response(request)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/django/core/handlers/base.py", line 197, in _get_response
response = wrapped_callback(request, *callback_args,
**callback_kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/sentry_sdk/integrations/django/views.py", line 94, in
sentry_wrapped_callback
return callback(request, *args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/django/views/decorators/csrf.py", line 56, in wrapper_view
return view_func(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/django/views/generic/base.py", line 104, in view
return self.dispatch(request, *args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/rest_framework/views.py", line 509, in dispatch
response = self.handle_exception(exc)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/rest_framework/views.py", line 469, in handle_exception
self.raise_uncaught_exception(exc)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/rest_framework/views.py", line 480, in raise_uncaught_exception
raise exc
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/rest_framework/views.py", line 506, in dispatch
response = handler(request, *args, **kwargs)
File "/var/app/current/appname/views/views_user.py", line 230, in patch
my_task.delay()
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/celery/app/task.py", line 444, in delay
return self.apply_async(args, kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/sentry_sdk/integrations/celery/__init__.py", line 290, in
apply_async
return f(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/celery/app/task.py", line 594, in apply_async
return app.send_task(
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/sentry_sdk/integrations/celery/__init__.py", line 290, in
apply_async
return f(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/celery/app/base.py", line 801, in send_task
amqp.send_task_message(P, name, message, **options)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/celery/app/amqp.py", line 518, in send_task_message
ret = producer.publish(
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/sentry_sdk/utils.py", line 1788, in runner
return sentry_patched_function(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/sentry_sdk/integrations/celery/__init__.py", line 527, in
sentry_publish
return original_publish(self, *args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/kombu/messaging.py", line 186, in publish
return _publish(
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/kombu/connection.py", line 556, in _ensured
return fun(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/kombu/messaging.py", line 195, in _publish
channel = self.channel
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/kombu/messaging.py", line 218, in _get_channel
channel = self._channel = channel()
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/kombu/utils/functional.py", line 34, in __call__
value = self.__value__ = self.__contract__()
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/kombu/messaging.py", line 234, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/kombu/connection.py", line 953, in default_channel
self._ensure_connection(**conn_opts)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/kombu/connection.py", line 459, in _ensure_connection
return retry_over_time(
File "/usr/lib64/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-
packages/kombu/connection.py", line 476, in _reraise_as_library_errors
raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: [Errno 111] Connection refused
}}}
**celery-worker output**
{{{
User information: uid=0 euid=0 gid=0 egid=0
warnings.warn(SecurityWarning(ROOT_DISCOURAGED.format(
-------------- cel...@ip-128-232-22-11.ap-west-3.compute.internal v5.4.0
(opalescent)
--- ***** -----
-- ******* ---- Linux-6.1.128-136.201.amzn2023.x86_64-x86_64-with-
glibc2.34 2025-02-28 19:19:41
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: djangoproject:0x4f535e7c8v70
- ** ---------- .> transport: sqs://localhost//
- ** ---------- .> results:
- *** --- * --- .> concurrency: 2 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this
worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. djangoproject.celery.debug_task
. djangoproject.myapp.tasks.activity
[2025-02-28 19:19:41,982: WARNING/MainProcess] /var/app/venv/staging-
LQM1lest/lib64/python3.8/site-
packages/celery/worker/consumer/consumer.py:508:
CPendingDeprecationWarning: The broker_connection_retry configuration
setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0
and above.
If you wish to retain the existing behavior for retrying connections on
startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2025-02-28 19:19:41,995: INFO/MainProcess] Found credentials in
environment variables.
[2025-02-28 19:19:42,109: INFO/MainProcess] Connected to sqs://localhost//
}}}
Steps tried till now:
1. starting worker on a separate server, right now it is running on the
same with supervisor
2. simplifying the **settings.py** to one file(removed prod, stag,etc
complexity)
3. added broker url and some other params to app = Celery() in
**celery.py**
4. tried upgrading python 3.8 to 3.9
5. replacing **task.delay** with **task.apply_async**
I'm thinking that it might be due to version incompatibilty but not sure
about that
--
Ticket URL: <
https://code.djangoproject.com/ticket/36219>
Django <
https://code.djangoproject.com/>
The Web framework for perfectionists with deadlines.