$ grep 527def2e-443c-4bfa-bc74-d040c26bf5ca /var/log/celery-worker.log
[2014-05-22 17:23:04,495: INFO/MainProcess] Received task: pynsights.tasks.datapull.pull_historical_raw_data[527def2e-443c-4bfa-bc74-d040c26bf5ca]
...
[2014-05-22 17:31:59,390: INFO/MainProcess] Received task: pynsights.tasks.datapull.pull_historical_raw_data[527def2e-443c-4bfa-bc74-d040c26bf5ca]
...
[2014-05-29 05:06:31,168: INFO/MainProcess] Received task: pynsights.tasks.aggregating.aggregate_one_day[a38bccfd-8db5-40d5-a9a2-2d9a191312f9]
[2014-05-29 05:06:31,355: INFO/MainProcess] Received task: pynsights.tasks.aggregating.aggregate_one_day[a38bccfd-8db5-40d5-a9a2-2d9a191312f9]
Ran into the same problem again on our production servers: the same Celery task (same uuid) was received twice by our Celery workers:[2014-05-29 05:06:31,168: INFO/MainProcess] Received task: pynsights.tasks.aggregating.aggregate_one_day[a38bccfd-8db5-40d5-a9a2-2d9a191312f9] [2014-05-29 05:06:31,355: INFO/MainProcess] Received task: pynsights.tasks.aggregating.aggregate_one_day[a38bccfd-8db5-40d5-a9a2-2d9a191312f9]
Any ideas what is causing this and how we could prevent it?
We have 2 celery worker processes, each with a different --hostname argument, running on the same machine. So I'm guessing the clocks of the two worker hosts must be in sync since they're on the same host.
Do we also need to sync the time between the workers and the broker (in our case, Redis)?
--
You received this message because you are subscribed to a topic in the Google Groups "celery-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/celery-users/W0Qf09ahjas/unsubscribe.
To unsubscribe from this group and all its topics, send an email to celery-users...@googlegroups.com.
Robert, did you manage to reproduce your problem, and if so, what is the delay after which a task is restarted (duplicate)?We have stumbled into another duplicate task issue on our servers, but ours can be reproduced:If we schedule a long-running task (2h), it will be redelivered after exactly one hour, despite our visibility_timeout being set to 3h:In [13]: app.connection().transport_optionsOut[13]: {'visibility_timeout': 11160}In [14]: app.connection().channel().visibility_timeoutOut[14]: 11160In addition to this, I've added some prints in the kombu Redis transport (https://github.com/celery/kombu/blob/master/kombu/transport/redis.py#L173) and it seems that the duplicate task is received even though it is not restored by redis.QoS.restore_visible, nor by redis.Channel._do_restore_messageAny advice on how to further debug this issue?
Yes, we are using acks_late.Here's what on our (single) server:
- 3 celery worker processes (all on the same instance)
- each worker runs on a different queue (3 queues, 3 workers)
- each worker runs with --concurrency=4 --maxtasksperchild=5 -Ofair --hostname=default.%h --without-gossip
- 1 separate celery beat
- Python packages used:
- kombu==3.0.16
- billiard==3.3.0.17
- celery==3.1.11
I did another experiment, taking into account your suggestion, and got this:1. Got another task duplicate after 1h0min6secs, with the same task id:[2014-06-09 08:59:00,826: INFO/MainProcess] Received task: pynsights.tasks.add.add[4f429dcd-6b33-4c0c-8564-526e62b0ab5c][2014-06-09 09:59:06,298: INFO/MainProcess] Received task: pynsights.tasks.add.add[4f429dcd-6b33-4c0c-8564-526e62b0ab5c]2. Added some prints in kombu.transport.redis.QoS.restore_visible and kombu.transport.redis.Channel._do_restore_message. Tested the prints outside of the experiment, and they work as expected.3. The log showed no record of _do_restore_message being called during the experiment (they are called as expected if I run a soft kill on all workers).4. The restore_visible function is being called, but visible = client.zrevrangebyscore never returns anything (which is expected).Any ideas?
The task was not sent twice, nor routed to multiple queues. I know this because I manually looked into the Redis storage and double-checked that there's only a single message on the *default* queue, and no other messages on other queues.
We do not raise Reject in any task, nor use any Celery task retries.
We are not using topic / fanout exchanges. We have custom Celery queues, but each worker has its own. Here's the configuration:# Default exchange name & type are defined only for extra clarity.CELERY_DEFAULT_EXCHANGE = 'tasks'CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'CELERY_DEFAULT_ROUTING_KEY = 'pynsights.tasks.default'CELERY_DEFAULT_QUEUE = 'default'CELERY_ROUTES = {'pynsights.tasks.aggregating.aggregate_one_day': {'queue': 'aggregating'},# As well as some other similar routes which DO NOT include the task from the# experiment, nor do they specify the default queue.
celery worker --queues=default --concurrency=2 --maxtasksperchild=5 -Ofair --without-gossip [...]
The following methods were not called during the experiment (from kombu.transport.redis):It seems that the duplicate message has the redelivered flag set to True, although the only code that sets that flag from kombu.transport.redis was never called. What does that mean?
- Channel._do_restore_message
- Channel._get
- Channel._put
- Channel._put_fanout
- virtual.Channel._restore
Also note that the message was not enqueued by any other worker, since no other worker was running and the running one never called Channel._put.
Indeed, the cause of the issue was the celery flower instance.
Thanks for all the help and sorry for the clumsy mistake. It did not occur to me that celery flower would ever restore unacked messages.
Indeed, the cause of the issue was the celery flower instance.
Thanks for all the help and sorry for the clumsy mistake. It did not occur to me that celery flower would ever restore unacked messages.