Re: [celery-users] Ensuring a teardown process on worker shutdown with signals?

2,536 views
Skip to first unread message

Ask Solem

unread,
Jan 9, 2013, 10:05:26 AM1/9/13
to celery...@googlegroups.com

On Jan 8, 2013, at 2:34 PM, Allen Householder <all...@cmu.edu> wrote:

> I have the following scenario: Multiple worker tasks require a long-running server (in a separate process) to already be up in order to complete. However I don't want the server process to start if there are no tasks. So I installed a task_prerun hook to start the server if it's not already going, and attempted to do the same on the worker shutdown to clean it up. The server process has a tendency to eventually get corrupted and needs to be restarted. I'd prefer to have celery handle this with its max tasks per child feature to stop and restart both the worker child and the corresponding server process (it needs to maintain one server process per child).
>
> celery.conf.CELERYD_MAX_TASKS_PER_CHILD = 10
> server = MyServer()
>
> @task_prerun
> def setup_server():
> if not server.running:
> server.start()
>
> @worker_shutdown
> def shutdown_server():
> if server.running:
> server.stop()
>
> @worker_process_init
> def install_pool_process_sighandlers(**kwargs):
> platforms.signals["TERM"] = shutdown_server
> platforms.signals["INT"] = shutdown_server
>
> Problem 1: This almost does the right thing when I ctrl-C the worker -- shutdown_server runs, but sometimes there are a few straggler tasks that seem to fire afterwards.
>
> Problem 2: What I expected (but does not appear to be happening) is that when CELERYD_MAX_TASKS_PER_CHILD is hit, the worker_shutdown signal would be sent before the process exits. Instead it just looks like the process exits and shutdown_server is never called.

In Celery the term 'worker' is used for the main worker process (also referred to as MainProcess).
The additional processes used with the multiprocessing pool are also referred to as workers
in multiprocessing but we try to consistently call them 'worker processes'.

So the worker_shutdown signal will not be triggered by those processes (nor worker_init/worker_ready),
it's only triggered by the MainProcess, and there is currently no equivalent signal for pool processes.

>
> So my questions:
> 1. Is there a way to tell shutdown_server to wait for outstanding tasks to complete before proceeding?

A TERM or INT signal will initiate shutdown in the MainProcess, this
this consists of many steps, one of them waiting for currently executing tasks to complete.

The worker, including startup and shutdown sequences are actually controlled by something called bootsteps.
Every component in the worker has its own bootstep, and this forms a dependency graph that is used to
determine startup and shutdown order. E.g. there's a Autoscaler bootstep, a Pool bootstep, a Consumer bootstep (which in 3.1 is rewritten to use bootsteps too, so every part of the consumer is a step).

The APi for this has been private, but will be public from 3.1. Sadly I haven't completed the documentation
yet, but you can see a pretty graph of the built-in bootsteps here:

http://docs.celeryproject.org/en/master/userguide/extending.html

> 2. Is there a way to hook the CELERYD_MAX_TASKS_PER_CHILD trigger to cause shutdown_server to be called prior to the worker child process exiting?

Well, you can use the TERM/INT signal as you do in your example, but overriding signal handlers
like that is a bad idea, it's better to use atexit or multiprocessing.util.Finalize so that multiple handlers
can be installed.

> 3. Or am I thinking about this wrong / is there another way?
>

It's generally ok to kill pool processes, but it's
absolutely not recommended to abruptly kill the MainProcess. As such you should consider
pool processes to be volatile, and a better place for such a feature would be in the MainProcess.

Using bootsteps for this would be easy (but you will have to use the development version for that, since
they are only available in 3.1).


Here's an example implementation using the internal timer to stop the server
when no tasks are running (at an interval of every 30 seconds):


from celery.bootsteps import StartStopStep
from functools import partial

class ScaleServer(StartStopStep):
interval = 30.0

def __init__(self, consumer, scale_server_interval=None, **kwargs):
if scale_server_interval is None:
self.interval = scale_server_interval

def start(self, consumer):
self.tref = consumer.timer.apply_interval(
self.interval * 1000.0,
partial(self.scale, consumer),
)

def scale(self, consumer):
if not consumer.controller.state.active_requests and server.running:
server.stop()

def stop(self, consumer):
if self.tref is not None:
self.tref.cancel()
self.tref = None

here's an annotated version:

class ScaleServer(StartStopStep):
interval = 30.0

def __init__(self, consumer, scale_server_interval=None, **kwargs):
"""The step init is called when the Consumer instance is created (or Worker
for a worker bootstep), and it's called with the consumer instance as the first
argument and all keyword arguments from the original Consumer.__init__ call.

Here we take scale_server_interval and ignore the rest,
a command-line argument can be added to set a value for this custom keyword.

"""
if scale_server_interval is not None:
self.interval = scale_server_interval
self.tref = None

def start(self, consumer):
"""This method is called when the worker starts up and also whenever
the AMQP connection is reset (which triggers an internal restart).

The timer is reset when the connection is lost, because something
may have depended on the current connection, so we have
to install the timer again for every call to self.start.
"""
self.tref = consumer.timer.apply_interval(
self.interval * 1000.0,
partial(self.scale, consumer),
)

def scale(self, consumer):
"""Timer triggers this every 30 seconds (or custom interval),
it will not trigger twice if the deadline was somehow missed."""

# See celery.worker.state module, it's what collects statistics and
# keeps track of what's going on.
if not consumer.controller.state.active_requests and server.running:
server.stop()

def stop(self, consumer):
"""This method is called at worker shutdown, or when the amqp connection
is lost (see start). You can define a 'shutdown(self, consumer)` method
to add actions at actual worker shutdown."""
if self.tref:
self.tref.cancel()
self.tref = None


If you use the new Celery API and have an app instance then installing
this bootstep is easy:

app = Celery()
app.steps['consumer'].add(ScaleServer)


If you're using the old API or django-celery it's a bit more tricky
since you don't have access to the app when in the configuration/settings module:

from celery.signals import celeryd_init

@celeryd_init.connect
def install_bootsteps(instance, **kwargs):
instance.app.steps['consumer'].add(ScaleServer)


You can add a --scale-server-interval command-line argument to the `celery worker` command too:

from celery.bin.base import Option

app.user_options['worker'].add(
Option('--scale-server-interval', type='float',
help='yadda yadda yadda'),
)



--
Ask Solem
twitter.com/asksol

Zachary Roadhouse

unread,
Oct 16, 2013, 11:15:23 AM10/16/13
to celery...@googlegroups.com
This post is really helpful to me.  Like the OP, my tasks in worker processes create sub-processes.  I'm attempting to add proper cleanup of these processes in response to sending a revoke task with terminate=True.  

What is the sequence of actions that celery performs when it receives a revoke w/terminate?  Does it issue a SIGTERM to the worker process?  Does the worker process have its own SIGTERM handler installed via signal.signal?  Is it safe to utilize atexit to install an exit handler (states will only work if the signal is handled via python)?

Ask Solem

unread,
Oct 16, 2013, 11:39:27 AM10/16/13
to celery...@googlegroups.com

On Oct 16, 2013, at 4:15 PM, Zachary Roadhouse <zroad...@rmn.com> wrote:

> This post is really helpful to me. Like the OP, my tasks in worker processes create sub-processes. I'm attempting to add proper cleanup of these processes in response to sending a revoke task with terminate=True.
>
> What is the sequence of actions that celery performs when it receives a revoke w/terminate? Does it issue a SIGTERM to the worker process? Does the worker process have its own SIGTERM handler installed via signal.signal? Is it safe to utilize atexit to install an exit handler (states will only work if the signal is handled via python)?
>


When a worker instance receives revoke+terminate it will revoke the id of the task
and if the id happens to be executing on the worker (is in active_requests) it will send
the signal requested to the process that is executing the task. SIGTERM is the default
signal, but the user may request any signal to be raised.

Then the multiprocessing pool workers do indeed have a TERM handler installed. The handler
simply raises SystemExit so that the Python cleanup phase happens. In the next release
of billiard the handler will also unregister itself, so that it will skip the cleanup phase
the next time a signal is sent.
--
Ask Solem
twitter.com/asksol

Zachary Roadhouse

unread,
Oct 16, 2013, 12:57:56 PM10/16/13
to celery...@googlegroups.com
Thank you! This is extremely helpful because it means I can simply catch SystemExit and KeyboardInterrupt to ensure that SIGINT and SIGTERM are handled. Here is the function I'm invoking from my celery task:

def execute_command(command):
process = subprocess.Popen(command, stdout=subprocess.PIPE)
try:
output, unused_err = process.communicate()
retcode = process.poll()
except (KeyboardInterrupt, SystemExit):
process.terminate()
process.wait()
raise
if retcode:
raise subprocess.CalledProcessError(retcode, command, output=output)
return output
> --
> You received this message because you are subscribed to a topic in the Google Groups "celery-users" group.
> To unsubscribe from this topic, visit https://groups.google.com/d/topic/celery-users/3fs0ocREYqw/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to celery-users...@googlegroups.com.
> To post to this group, send email to celery...@googlegroups.com.
> Visit this group at http://groups.google.com/group/celery-users.
> For more options, visit https://groups.google.com/groups/opt_out.


--

This e-mail, including attachments, contains confidential and/or
proprietary information, and may be used only by the person or entity to
which it is addressed. The reader is hereby notified that any
dissemination, distribution or copying of this e-mail is prohibited. If you
have received this e-mail in error, please notify the sender by replying to
this message and delete this e-mail immediately.
Reply all
Reply to author
Forward
0 new messages