Celery 4: Class-based tasks aren't getting registered

356 views
Skip to first unread message

Andrew

unread,
Mar 5, 2020, 7:16:12 PM3/5/20
to celery-users
I am upgrading celery from 3 to 4, and I can't get class-based tasks to be registered. I can convert some of them to callables, and they can get registered, but our codebase has far more complicated tasks that simply have to keep their class structure, barring a significant amount of refactoring. Our rabbitmq server can pick up the tasks, the worker acknowledges the task, but I get the error:

[2020-03-05 22:50:18,528: ERROR/MainProcess] Received unregistered task of type 'QueuedSurvey'.
            
(and further down)

KeyError: 'QueuedSurvey' 

Here is the example task that fails. I am following the same format I've seen throughout the celery docs and internet: 
         
class QueuedSurvey(celery.Task):
    def run(self, profile_ids, survey_id, notification_email):
        @transaction.atomic
        def run_inner():
            print("123")
        return run_inner()
 
QueuedSurvey = app.register_task(QueuedSurvey())


Here is my app:

from siphon import settings_celery
from celery import Celery

app = Celery(accept_content=['pickle'])
app.conf.task_protocol = 1
app.config_from_object(settings_celery, namespace='CELERY') 


And my settings:

CELERY_TASK_IGNORE_RESULT = True

CELERY_IMPORTS = ['cal.celery.tasks', 'cal.tasks_lambda']

CELERY_TASK_ROUTES = ('cal.celery.routers.Router', )

for q in CELERY_TASK_QUEUES:
    existing = CELERY_TASK_QUEUES[q]
    binding = {'binding_key': q}
    CELERY_TASK_QUEUES[q] = dict(list(existing.items()) + list(binding.items()))

CELERY_TASK_DEFAULT_QUEUE = 'cal.default'
CELERY_TASK_DEFAULT_EXCHANGE = 'default'
CELERY_TASK_DEFAULT_EXCHANGE_TYPE = 'direct'

CELERY_SEND_TASK_ERROR_EMAILS = True  
CELERYD_LOG_LEVEL = "INFO"

CELERY_ACCEPT_CONTENT = ['pickle']
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'

CELERY_WORKER_PREFETCH_MULTIPLIER = 4

Again, I can convert the example task above to a callable with the @app.task decorator and all that, and it works. But staying in this format does not. Also worth noting that in Celery 3, that task also works, and it doesn't even need an app object.
             

Bernd Wechner

unread,
Apr 21, 2020, 9:37:07 AM4/21/20
to celery-users
I can't say what's up there, but I can suggest looking at app.tasks. That is simply a dict of all the registered tasks.

I am guessing the thing through KeyError: 'QueuedSurvey', is  looking in this dict for QueuedSurvey. So after registering it, check app.tasks to see if it is registered, and check this in both contexts (client and worker).

I'd also take a look inside register_task(). Doing that for you it is here: https://docs.celeryproject.org/en/stable/_modules/celery/app/base.html#Celery

    def register_task(self, task):
        """Utility for registering a task-based class.

        Note:
            This is here for compatibility with old Celery 1.0
            style task classes, you should not need to use this for
            new projects.
        """
        if not task.name:
            task_cls = type(task)
            task.name = self.gen_task_name(
                task_cls.__name__, task_cls.__module__)
        self.tasks[task.name] = task
        task._app = self
        task.bind(self)
        return task

And you can step into that and confirm it adds the task by name to self.tasks (self being your app). The key to note is that this runs independently from your source in the client and worker contexts, so make sure you're registering in both.
Reply all
Reply to author
Forward
0 new messages