Tasks run multiple times

6,469 views
Skip to first unread message

Robert Pröpper

unread,
May 22, 2014, 12:13:43 PM5/22/14
to celery...@googlegroups.com
Hi,

I use celery to manage long running jobs (from minutes to hours or days). I am not using ETA tasks, but sometimes tasks are started multiple times. Once this starts to happen it seems to always affect multiple tasks. I have not yet found a simple test case where this reliably occurs, so this question is mostly about some general point I might be missing or how to best proceed in debugging this problem. 

Some notes on my setup: Both the celery worker (with concurrency 14) and the redis broker run on a single 16-core machine. I run the worker with the -0fair option. CELERYD_MAX_TASKS_PER_CHILD is set to 1. The broker visibility_timeout is set to an arbitrary high value beyond the timescale of the problem.

Any Ideas?

Best,
Robert

Loic Duros

unread,
May 22, 2014, 12:16:08 PM5/22/14
to celery...@googlegroups.com
Just curious, I'm running Celery with Redis as well. What makes you
say that the task is being run multiple times? How do you know for a
fact it is running multiple times?
> --
> You received this message because you are subscribed to the Google Groups
> "celery-users" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to celery-users...@googlegroups.com.
> To post to this group, send email to celery...@googlegroups.com.
> Visit this group at http://groups.google.com/group/celery-users.
> For more options, visit https://groups.google.com/d/optout.



--
Loic J. Duros - http://lduros.net

Robert Pröpper

unread,
May 22, 2014, 12:24:54 PM5/22/14
to celery...@googlegroups.com
The tasks carry out computations and write results to a database. I get duplicate rows when a particular tasks runs multiple times. I also notice from the time it takes to complete a set of tasks. But I've double checked and I am quite sure that each task is only sent once (via celery.Celery.send_task).

Mihnea Giurgea

unread,
May 23, 2014, 7:09:47 AM5/23/14
to celery...@googlegroups.com
I am also running into the same issue:

I am running Celery with the following configurations:
  • default prefork pool
  • Redis as message broker
  • broker visibility timeout set to hours
  • no retry mechanism
  • CELERY_ACKS_LATE = True
Sometimes Celery reads the same task twice, as can be seen from the following log lines:
$ grep 527def2e-443c-4bfa-bc74-d040c26bf5ca /var/log/celery-worker.log
[2014-05-22 17:23:04,495: INFO/MainProcess] Received task: pynsights.tasks.datapull.pull_historical_raw_data[527def2e-443c-4bfa-bc74-d040c26bf5ca]
...
[2014-05-22 17:31:59,390: INFO/MainProcess] Received task: pynsights.tasks.datapull.pull_historical_raw_data[527def2e-443c-4bfa-bc74-d040c26bf5ca]
...

It has read the same Celery task (identical uuid) after less than 8 minutes, even though the Broker visibility timeout is set to several hours.

Any ideas what could cause this?

Mihnea Giurgea

unread,
May 29, 2014, 5:25:08 AM5/29/14
to celery...@googlegroups.com
Ran into the same problem again on our production servers: the same Celery task (same uuid) was received twice by our Celery workers:

[2014-05-29 05:06:31,168: INFO/MainProcess] Received task: pynsights.tasks.aggregating.aggregate_one_day[a38bccfd-8db5-40d5-a9a2-2d9a191312f9]
[2014-05-29 05:06:31,355: INFO/MainProcess] Received task: pynsights.tasks.aggregating.aggregate_one_day[a38bccfd-8db5-40d5-a9a2-2d9a191312f9]
Any ideas what is causing this and how we could prevent it?

Ask Solem

unread,
May 29, 2014, 6:07:23 AM5/29/14
to celery...@googlegroups.com

On May 29, 2014, at 10:25 AM, Mihnea Giurgea <ski...@gmail.com> wrote:

Ran into the same problem again on our production servers: the same Celery task (same uuid) was received twice by our Celery workers:

[2014-05-29 05:06:31,168: INFO/MainProcess] Received task: pynsights.tasks.aggregating.aggregate_one_day[a38bccfd-8db5-40d5-a9a2-2d9a191312f9]
[2014-05-29 05:06:31,355: INFO/MainProcess] Received task: pynsights.tasks.aggregating.aggregate_one_day[a38bccfd-8db5-40d5-a9a2-2d9a191312f9]
Any ideas what is causing this and how we could prevent it?


Are there multiple worker hosts? And if so are the clocks in sync?
-- 
Ask Solem
twitter.com/asksol | +44 07454281208

Mihnea Giurgea

unread,
May 29, 2014, 7:36:20 AM5/29/14
to celery...@googlegroups.com
We have 2 celery worker processes, each with a different --hostname argument, running on the same machine. So I'm guessing the clocks of the two worker hosts must be in sync since they're on the same host.

Do we also need to sync the time between the workers and the broker (in our case, Redis)?

Ask Solem

unread,
May 29, 2014, 8:12:37 AM5/29/14
to celery...@googlegroups.com
On May 29, 2014, at 12:36 PM, Mihnea Giurgea <ski...@gmail.com> wrote:

We have 2 celery worker processes, each with a different --hostname argument, running on the same machine. So I'm guessing the clocks of the two worker hosts must be in sync since they're on the same host.


You also mention that you’re using acks late, so are you sure the connection is not lost in between?

Do we also need to sync the time between the workers and the broker (in our case, Redis)?


That should have no effect on this

Mihnea Giurgea

unread,
May 29, 2014, 8:37:36 AM5/29/14
to celery...@googlegroups.com
We're in the process of migrating the Redis setup to something more stable and I will have a more definitive answer in a few days. However, I don't see any reason why the connection would be lost.

I've measured the delay between replaying duplicate tasks, and it is anywhere between <1s to 1-2 hours. 

Any ideas other than lost connections?


--
You received this message because you are subscribed to a topic in the Google Groups "celery-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/celery-users/W0Qf09ahjas/unsubscribe.
To unsubscribe from this group and all its topics, send an email to celery-users...@googlegroups.com.

Mihnea Giurgea

unread,
Jun 4, 2014, 5:42:08 AM6/4/14
to celery...@googlegroups.com, giurge...@gmail.com
Found the culprit: misconfiguration of Celery routes and default exchanged, which caused each message to be routed to 2 different queues.

Robert Pröpper

unread,
Jun 4, 2014, 1:02:08 PM6/4/14
to celery...@googlegroups.com
Unfortunately, this can't be the source of my problem: I'm only using a single queue. But I've also seen affected tasks being received multiple times.



On 04.06.2014 11:42, Mihnea Giurgea wrote:

Mihnea Giurgea

unread,
Jun 6, 2014, 10:41:13 AM6/6/14
to celery...@googlegroups.com
Robert, did you manage to reproduce your problem, and if so, what is the delay after which a task is restarted (duplicate)?

We have stumbled into another duplicate task issue on our servers, but ours can be reproduced:

If we schedule a long-running task (2h), it will be redelivered after exactly one hour, despite our visibility_timeout being set to 3h:

In [13]: app.connection().transport_options
Out[13]: {'visibility_timeout': 11160}

In [14]: app.connection().channel().visibility_timeout
Out[14]: 11160

In addition to this, I've added some prints in the kombu Redis transport (https://github.com/celery/kombu/blob/master/kombu/transport/redis.py#L173) and it seems that the duplicate task is received even though it is not restored by redis.QoS.restore_visible, nor by redis.Channel._do_restore_message

Any advice on how to further debug this issue?

Thanks,
Mihnea

Ask Solem

unread,
Jun 6, 2014, 12:06:08 PM6/6/14
to celery...@googlegroups.com

On Jun 6, 2014, at 3:41 PM, Mihnea Giurgea <ski...@gmail.com> wrote:

Robert, did you manage to reproduce your problem, and if so, what is the delay after which a task is restarted (duplicate)?

We have stumbled into another duplicate task issue on our servers, but ours can be reproduced:

If we schedule a long-running task (2h), it will be redelivered after exactly one hour, despite our visibility_timeout being set to 3h:

In [13]: app.connection().transport_options
Out[13]: {'visibility_timeout': 11160}

In [14]: app.connection().channel().visibility_timeout
Out[14]: 11160

In addition to this, I've added some prints in the kombu Redis transport (https://github.com/celery/kombu/blob/master/kombu/transport/redis.py#L173) and it seems that the duplicate task is received even though it is not restored by redis.QoS.restore_visible, nor by redis.Channel._do_restore_message

Any advice on how to further debug this issue?


How do you know it’s not been restored by restore visible?
Note that any worker can restore it, so it can be restored by a different worker than
the one you’re looking at.

If the duplicate task is received by the worker (logging “Received task ….”) then the
task must have been put there by someone.

Do you use late ack?

Mihnea Giurgea

unread,
Jun 9, 2014, 6:39:15 AM6/9/14
to celery...@googlegroups.com
Yes, we are using acks_late. 

Here's what on our (single) server:
  • 3 celery worker processes (all on the same instance)
    • each worker runs on a different queue (3 queues, 3 workers)
    • each worker runs with --concurrency=4 --maxtasksperchild=5 -Ofair --hostname=default.%h --without-gossip
  • 1 separate celery beat 
  • Python packages used:
    • kombu==3.0.16
    • billiard==3.3.0.17
    • celery==3.1.11
I did another experiment, taking into account your suggestion, and got this:

1. Got another task duplicate after 1h0min6secs, with the same task id:

[2014-06-09 08:59:00,826: INFO/MainProcess] Received task: pynsights.tasks.add.add[4f429dcd-6b33-4c0c-8564-526e62b0ab5c]
[2014-06-09 09:59:06,298: INFO/MainProcess] Received task: pynsights.tasks.add.add[4f429dcd-6b33-4c0c-8564-526e62b0ab5c]

2. Added some prints in kombu.transport.redis.QoS.restore_visible and kombu.transport.redis.Channel._do_restore_message. Tested the prints outside of the experiment, and they work as expected.

3. The log showed no record of _do_restore_message being called during the experiment (they are called as expected if I run a soft kill on all workers). 

4. The restore_visible function is being called, but visible = client.zrevrangebyscore never returns anything (which is expected).

Any ideas? 

Many thanks in advance :)

Mihnea Giurgea

unread,
Jun 9, 2014, 6:43:43 AM6/9/14
to celery...@googlegroups.com
Forgot to attach the results of "celery inspect active" command:

-> cel...@default.stage-worker-1: OK
    * {u'args': u'(-7200, 103)', u'time_start': 941687.235853611, u'name': u'pynsights.tasks.add.add', u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'pynsights.tasks.default', u'exchange': u'tasks'}, u'hostname': u'cel...@default.stage-worker-1', u'acknowledged': False, u'kwargs': u'{}', u'id': u'4f429dcd-6b33-4c0c-8564-526e62b0ab5c', u'worker_pid': 20566}
    * {u'args': u'(-7200, 103)', u'time_start': 938081.762304558, u'name': u'pynsights.tasks.add.add', u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'pynsights.tasks.default', u'exchange': u'tasks'}, u'hostname': u'cel...@default.stage-worker-1', u'acknowledged': False, u'kwargs': u'{}', u'id': u'4f429dcd-6b33-4c0c-8564-526e62b0ab5c', u'worker_pid': 20564}
-> cel...@aggregating.stage-worker-1: OK
    - empty -
-> cel...@predicting.stage-worker-1: OK
    - empty -

Notice the redelivered: None on both tasks. Is that normal?

Mihnea Giurgea

unread,
Jun 11, 2014, 6:22:32 AM6/11/14
to celery...@googlegroups.com
Bump.

Any ideas on this?

Thank you!

Ask Solem

unread,
Jun 11, 2014, 6:44:22 AM6/11/14
to celery...@googlegroups.com

On Jun 9, 2014, at 11:39 AM, Mihnea Giurgea <ski...@gmail.com> wrote:

Yes, we are using acks_late. 

Here's what on our (single) server:
  • 3 celery worker processes (all on the same instance)
    • each worker runs on a different queue (3 queues, 3 workers)
    • each worker runs with --concurrency=4 --maxtasksperchild=5 -Ofair --hostname=default.%h --without-gossip
  • 1 separate celery beat 
  • Python packages used:
    • kombu==3.0.16
    • billiard==3.3.0.17
    • celery==3.1.11
I did another experiment, taking into account your suggestion, and got this:

1. Got another task duplicate after 1h0min6secs, with the same task id:

[2014-06-09 08:59:00,826: INFO/MainProcess] Received task: pynsights.tasks.add.add[4f429dcd-6b33-4c0c-8564-526e62b0ab5c]
[2014-06-09 09:59:06,298: INFO/MainProcess] Received task: pynsights.tasks.add.add[4f429dcd-6b33-4c0c-8564-526e62b0ab5c]

2. Added some prints in kombu.transport.redis.QoS.restore_visible and kombu.transport.redis.Channel._do_restore_message. Tested the prints outside of the experiment, and they work as expected.

3. The log showed no record of _do_restore_message being called during the experiment (they are called as expected if I run a soft kill on all workers). 

4. The restore_visible function is being called, but visible = client.zrevrangebyscore never returns anything (which is expected).

Any ideas? 



Thanks for investigating.

If the redelivered flag is not set then I guess it would not have been restored, but I’m not 100% certain
the redis transport sets that properly.

So then I guess it may be more tricky to debug, but honestly if the worker prints the “Received task” log
then chances are the task was either 1) sent twice, or 2) routed to multiple queues.

Then there is also the possibility that the task was Reject’ed, but I guess that is unlikely if you don’t
raise Reject in your task or your task does not use retry.

Are you using topic or fanout exchanges?  Do you have a custom CELERY_QUEUES?

Mihnea Giurgea

unread,
Jun 11, 2014, 7:11:27 AM6/11/14
to celery...@googlegroups.com
The task was not sent twice, nor routed to multiple queues. I know this because I manually looked into the Redis storage and double-checked that there's only a single message on the *default* queue, and no other messages on other queues.

We do not raise Reject in any task, nor use any Celery task retries.

We are not using topic / fanout exchanges. We have custom Celery queues, but each worker has its own. Here's the configuration:

# Default exchange name & type are defined only for extra clarity.
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'

CELERY_DEFAULT_ROUTING_KEY = 'pynsights.tasks.default'
CELERY_DEFAULT_QUEUE = 'default'

CELERY_ROUTES = {
    'pynsights.tasks.aggregating.aggregate_one_day': {
        'queue': 'aggregating'
    },
    # As well as some other similar routes which DO NOT include the task from the
    # experiment, nor do they specify the default queue.

Ask Solem

unread,
Jun 11, 2014, 10:21:56 AM6/11/14
to celery...@googlegroups.com

On Jun 11, 2014, at 12:11 PM, Mihnea Giurgea <ski...@gmail.com> wrote:

The task was not sent twice, nor routed to multiple queues. I know this because I manually looked into the Redis storage and double-checked that there's only a single message on the *default* queue, and no other messages on other queues.

We do not raise Reject in any task, nor use any Celery task retries.

We are not using topic / fanout exchanges. We have custom Celery queues, but each worker has its own. Here's the configuration:

# Default exchange name & type are defined only for extra clarity.
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'

CELERY_DEFAULT_ROUTING_KEY = 'pynsights.tasks.default'
CELERY_DEFAULT_QUEUE = 'default'

CELERY_ROUTES = {
    'pynsights.tasks.aggregating.aggregate_one_day': {
        'queue': 'aggregating'
    },
    # As well as some other similar routes which DO NOT include the task from the
    # experiment, nor do they specify the default queue.

The message must have been put onto the queue by something, the ‘Task received” log line means the task
came directly from the BRPOP command in Redis, there are no buffers or semaphores in this path (except for the socket itself)

Mihnea Giurgea

unread,
Jun 13, 2014, 6:36:23 AM6/13/14
to celery...@googlegroups.com
Coming back with some more details after running a few more experiments:

I ran the experiment with only a single worker started (all other are stopped). That worker was ran with: 
celery worker --queues=default --concurrency=2 --maxtasksperchild=5 -Ofair --without-gossip [...]

The duplicate task was received again. Here is the log with some additional prints from system functions (kombu.transport.redis):

[2014-06-13 07:46:43,211: WARNING/MainProcess] Channel._brpop_read: returning {'body': '...', 'headers': {}, 'content-type': 'application/x-python-serialize', 'properties': {'body_encoding': 'base64', 'delivery_info': {'priority': 0, 'routing_key': 'pynsights.tasks.default', 'exchange': 'tasks'}, 'delivery_mode': 2, 'correlation_id': '44f39152-9c03-44c4-857c-6d9c2dfb6d0b', 'reply_to': '492a77b2-d8ad-3892-bb12-6592cea23f9b', 'delivery_tag': 'fd9b5c30-406c-4c3b-87a1-5ba47f6f7883'}, 'content-encoding': 'binary'} to dest = default
[2014-06-13 07:46:43,212: INFO/MainProcess] Received task: pynsights.tasks.add.add[44f39152-9c03-44c4-857c-6d9c2dfb6d0b]
[2014-06-13 08:46:48,276: WARNING/MainProcess] Channel._brpop_read: returning {'body': '...', 'headers': {'redelivered': True}, 'content-type': 'application/x-python-serialize', 'properties': {'body_encoding': 'base64', 'delivery_info': {'priority': 0, 'routing_key': 'pynsights.tasks.default', 'exchange': 'tasks'}, 'delivery_mode': 2, 'correlation_id': '44f39152-9c03-44c4-857c-6d9c2dfb6d0b', 'reply_to': '492a77b2-d8ad-3892-bb12-6592cea23f9b', 'delivery_tag': 'fd9b5c30-406c-4c3b-87a1-5ba47f6f7883'}, 'content-encoding': 'binary'} to dest = default
[2014-06-13 08:46:48,277: INFO/MainProcess] Received task: pynsights.tasks.add.add[44f39152-9c03-44c4-857c-6d9c2dfb6d0b]

(I have highlighted the only differences picked up by Channel._brpop_read).

The following methods were not called during the experiment (from kombu.transport.redis):
  • Channel._do_restore_message
  • Channel._get
  • Channel._put
  • Channel._put_fanout
  • virtual.Channel._restore
It seems that the duplicate message has the redelivered flag set to True, although the only code that sets that flag from kombu.transport.redis was never called. What does that mean?

Also note that the message was not enqueued by any other worker, since no other worker was running and the running one never called Channel._put.

What am I missing? 

Many thanks in advance for all the patience!

Mihnea



Ask Solem

unread,
Jun 13, 2014, 8:04:25 AM6/13/14
to celery...@googlegroups.com

On Jun 13, 2014, at 11:36 AM, Mihnea Giurgea <ski...@gmail.com> wrote:

The following methods were not called during the experiment (from kombu.transport.redis):
  • Channel._do_restore_message
  • Channel._get
  • Channel._put
  • Channel._put_fanout
  • virtual.Channel._restore
It seems that the duplicate message has the redelivered flag set to True, although the only code that sets that flag from kombu.transport.redis was never called. What does that mean?

Also note that the message was not enqueued by any other worker, since no other worker was running and the running one never called Channel._put.



Very mysterious indeed, the only functions that perform LPUSH/RPUSH are _do_restore_message and _put.

My hunch would be that there are more workers here, one or more that you did not shut down properly
that are still configured to use the default visibility timeout of 1 hour.

It’s the only explanation that makes sense to me, and it’s very easy to over look a stray worker running
somewhere.

Mihnea Giurgea

unread,
Jun 13, 2014, 10:30:06 AM6/13/14
to celery...@googlegroups.com
We've looked around and we don't seem to have any stray workers running anywhere.

However, we do have one celery flower instance running. Is it possible that the flower instance is restoring unacked messages?

Mihnea Giurgea

unread,
Jun 13, 2014, 12:02:45 PM6/13/14
to celery...@googlegroups.com
Indeed, the cause of the issue was the celery flower instance.

Thanks for all the help and sorry for the clumsy mistake. It did not occur to me that celery flower would ever restore unacked messages.

Mihnea

Ask Solem

unread,
Jun 13, 2014, 12:23:30 PM6/13/14
to celery...@googlegroups.com

On Jun 13, 2014, at 5:02 PM, Mihnea Giurgea <ski...@gmail.com> wrote:

Indeed, the cause of the issue was the celery flower instance.

Thanks for all the help and sorry for the clumsy mistake. It did not occur to me that celery flower would ever restore unacked messages.

Hah, even I did not consider the fact that flower would help recover the unacknowledged tasks,
but that makes perfect sense :)

Glad it’s been resolved

Ask Solem

unread,
Jun 13, 2014, 12:26:22 PM6/13/14
to celery...@googlegroups.com

On Jun 13, 2014, at 5:02 PM, Mihnea Giurgea <ski...@gmail.com> wrote:

Indeed, the cause of the issue was the celery flower instance.

Thanks for all the help and sorry for the clumsy mistake. It did not occur to me that celery flower would ever restore unacked messages.



On second thought this should be considered a bug since it means it will be very hard
to get custom visibility timeouts right, since every new consumer using the same
redis instance must always use the same visibility timeout.

It could be solved by associating a visibility timeout with every message,
instead of in the consumer like it works now.
Reply all
Reply to author
Forward
0 new messages