On Jan 8, 2013, at 2:34 PM, Allen Householder <
all...@cmu.edu> wrote:
> I have the following scenario: Multiple worker tasks require a long-running server (in a separate process) to already be up in order to complete. However I don't want the server process to start if there are no tasks. So I installed a task_prerun hook to start the server if it's not already going, and attempted to do the same on the worker shutdown to clean it up. The server process has a tendency to eventually get corrupted and needs to be restarted. I'd prefer to have celery handle this with its max tasks per child feature to stop and restart both the worker child and the corresponding server process (it needs to maintain one server process per child).
>
> celery.conf.CELERYD_MAX_TASKS_PER_CHILD = 10
> server = MyServer()
>
> @task_prerun
> def setup_server():
> if not server.running:
> server.start()
>
> @worker_shutdown
> def shutdown_server():
> if server.running:
> server.stop()
>
> @worker_process_init
> def install_pool_process_sighandlers(**kwargs):
> platforms.signals["TERM"] = shutdown_server
> platforms.signals["INT"] = shutdown_server
>
> Problem 1: This almost does the right thing when I ctrl-C the worker -- shutdown_server runs, but sometimes there are a few straggler tasks that seem to fire afterwards.
>
> Problem 2: What I expected (but does not appear to be happening) is that when CELERYD_MAX_TASKS_PER_CHILD is hit, the worker_shutdown signal would be sent before the process exits. Instead it just looks like the process exits and shutdown_server is never called.
In Celery the term 'worker' is used for the main worker process (also referred to as MainProcess).
The additional processes used with the multiprocessing pool are also referred to as workers
in multiprocessing but we try to consistently call them 'worker processes'.
So the worker_shutdown signal will not be triggered by those processes (nor worker_init/worker_ready),
it's only triggered by the MainProcess, and there is currently no equivalent signal for pool processes.
>
> So my questions:
> 1. Is there a way to tell shutdown_server to wait for outstanding tasks to complete before proceeding?
A TERM or INT signal will initiate shutdown in the MainProcess, this
this consists of many steps, one of them waiting for currently executing tasks to complete.
The worker, including startup and shutdown sequences are actually controlled by something called bootsteps.
Every component in the worker has its own bootstep, and this forms a dependency graph that is used to
determine startup and shutdown order. E.g. there's a Autoscaler bootstep, a Pool bootstep, a Consumer bootstep (which in 3.1 is rewritten to use bootsteps too, so every part of the consumer is a step).
The APi for this has been private, but will be public from 3.1. Sadly I haven't completed the documentation
yet, but you can see a pretty graph of the built-in bootsteps here:
http://docs.celeryproject.org/en/master/userguide/extending.html
> 2. Is there a way to hook the CELERYD_MAX_TASKS_PER_CHILD trigger to cause shutdown_server to be called prior to the worker child process exiting?
Well, you can use the TERM/INT signal as you do in your example, but overriding signal handlers
like that is a bad idea, it's better to use atexit or multiprocessing.util.Finalize so that multiple handlers
can be installed.
> 3. Or am I thinking about this wrong / is there another way?
>
It's generally ok to kill pool processes, but it's
absolutely not recommended to abruptly kill the MainProcess. As such you should consider
pool processes to be volatile, and a better place for such a feature would be in the MainProcess.
Using bootsteps for this would be easy (but you will have to use the development version for that, since
they are only available in 3.1).
Here's an example implementation using the internal timer to stop the server
when no tasks are running (at an interval of every 30 seconds):
from celery.bootsteps import StartStopStep
from functools import partial
class ScaleServer(StartStopStep):
interval = 30.0
def __init__(self, consumer, scale_server_interval=None, **kwargs):
if scale_server_interval is None:
self.interval = scale_server_interval
def start(self, consumer):
self.tref = consumer.timer.apply_interval(
self.interval * 1000.0,
partial(self.scale, consumer),
)
def scale(self, consumer):
if not consumer.controller.state.active_requests and server.running:
server.stop()
def stop(self, consumer):
if self.tref is not None:
self.tref.cancel()
self.tref = None
here's an annotated version:
class ScaleServer(StartStopStep):
interval = 30.0
def __init__(self, consumer, scale_server_interval=None, **kwargs):
"""The step init is called when the Consumer instance is created (or Worker
for a worker bootstep), and it's called with the consumer instance as the first
argument and all keyword arguments from the original Consumer.__init__ call.
Here we take scale_server_interval and ignore the rest,
a command-line argument can be added to set a value for this custom keyword.
"""
if scale_server_interval is not None:
self.interval = scale_server_interval
self.tref = None
def start(self, consumer):
"""This method is called when the worker starts up and also whenever
the AMQP connection is reset (which triggers an internal restart).
The timer is reset when the connection is lost, because something
may have depended on the current connection, so we have
to install the timer again for every call to self.start.
"""
self.tref = consumer.timer.apply_interval(
self.interval * 1000.0,
partial(self.scale, consumer),
)
def scale(self, consumer):
"""Timer triggers this every 30 seconds (or custom interval),
it will not trigger twice if the deadline was somehow missed."""
# See celery.worker.state module, it's what collects statistics and
# keeps track of what's going on.
if not consumer.controller.state.active_requests and server.running:
server.stop()
def stop(self, consumer):
"""This method is called at worker shutdown, or when the amqp connection
is lost (see start). You can define a 'shutdown(self, consumer)` method
to add actions at actual worker shutdown."""
if self.tref:
self.tref.cancel()
self.tref = None
If you use the new Celery API and have an app instance then installing
this bootstep is easy:
app = Celery()
app.steps['consumer'].add(ScaleServer)
If you're using the old API or django-celery it's a bit more tricky
since you don't have access to the app when in the configuration/settings module:
from celery.signals import celeryd_init
@celeryd_init.connect
def install_bootsteps(instance, **kwargs):
instance.app.steps['consumer'].add(ScaleServer)
You can add a --scale-server-interval command-line argument to the `celery worker` command too:
from celery.bin.base import Option
app.user_options['worker'].add(
Option('--scale-server-interval', type='float',
help='yadda yadda yadda'),
)
--
Ask Solem
twitter.com/asksol