How to cleanly shutdown the IO loop when using run_in_executor()?

2,941 views
Skip to first unread message

Giampaolo Rodola'

unread,
Mar 26, 2014, 12:00:24 PM3/26/14
to python...@googlegroups.com
Hello there,
according to asyncio doc this is the correct way to handle SIGINT/SIGTERM signals in order to "cleanly" shutdown the IO loop:
This worked well for me as long as I didn't introduce executors.
Note: I expressively decided to use ProcessPoolExecutor instead of ThreadPoolExecutor in order to be able to terminate workers and exit sooner:

import asyncio
import functools
import time
import concurrent.futures
import signal

loop = asyncio.get_event_loop()
executor = concurrent.futures.ProcessPoolExecutor()

def long_running_fun():
    for x in range(5):
        print("loop %s" % x)
        time.sleep(1000)

@asyncio.coroutine
def infinite_loop():
    while True:
        try:
            fut = loop.run_in_executor(None, long_running_fun)
            yield from asyncio.wait_for(fut, None)
        finally:
            yield from asyncio.sleep(1)

def ask_exit(signame):
    print("got signal %s: exit" % signame)
    loop.stop()
    executor.shutdown()

def main():
    loop.set_default_executor(executor)
    asyncio.async(infinite_loop())
    for signame in ('SIGINT', 'SIGTERM'):
        loop.add_signal_handler(getattr(signal, signame),
                                functools.partial(ask_exit, signame))
    loop.run_forever()

if __name__ == '__main__':
    main()


The problem with this code is that every time I hit CTRL+C "time.sleep()" returns immediately and the "for" loop keeps looping until it's exhausted. 
This is the output:

$ python3.4 foo.py 
loop 0
^Cloop 1
got signal SIGINT: exit
^Cloop 2
^Cloop 3
^Cloop 4
^CException ignored in: <generator object infinite_loop at 0x7fb0c0760cf0>
RuntimeError: generator ignored GeneratorExit
$

I've also tried other solutions such terminating processes returned by multiprocessing.active_children() but it has the same effect.
Apparently the only effective strategy is to use SIGKILL.
Basically I'm looking for a way to cleanly shutdown the IO loop and all its pending workers and if there's a "blessed" strategy in order to do that it would probably makes sense to mention it in the doc because it's not obvious.

--

Guido van Rossum

unread,
Mar 26, 2014, 1:25:43 PM3/26/14
to Giampaolo Rodola', python-tulip
Sounds like you're breaking ground and combining things that haven't been tested together before. Some comments:

- Setting the default executor to a ProcessPoolExecutor feels like a bad idea -- it would mean that every time you connect to a remote host the address lookup is done in that executor (that's mainly why there is a default executor at all -- the API to set it mostly exists so you can configure the number of threads). Instead, I would just pass an explicit executor to run_in_executor().

- Looks like the signal handler is somehow inherited by the subprocess created for the process pool? Otherwise I'm not sure how to explain that the sleep(1000) returns immediately but doesn't raise an exception -- that's what happens with sleep() when a signal handler is called, but not when the handler raises an exception or the signal's default action is set (SIG_DFL) or it is ignored (SIG_IGN).

- I'm not sure exactly where the RuntimeError comes from. It's possible that this happens during final program GC. More print statements are in order.

- Do you know how far ask_exit() made it? I'd like to see more prints there too.
--
--Guido van Rossum (python.org/~guido)

Giampaolo Rodola'

unread,
Mar 26, 2014, 2:30:25 PM3/26/14
to Guido van Rossum, python-tulip
On Wed, Mar 26, 2014 at 6:25 PM, Guido van Rossum <gu...@python.org> wrote:
Sounds like you're breaking ground and combining things that haven't been tested together before. Some comments:

- Setting the default executor to a ProcessPoolExecutor feels like a bad idea -- it would mean that every time you connect to a remote host the address lookup is done in that executor (that's mainly why there is a default executor at all -- the API to set it mostly exists so you can configure the number of threads). Instead, I would just pass an explicit executor to run_in_executor().

OK.

- Do you know how far ask_exit() made it? I'd like to see more prints there too.

It completed.
 
- Looks like the signal handler is somehow inherited by the subprocess created for the process pool? Otherwise I'm not sure how to explain that the sleep(1000) returns immediately but doesn't raise an exception -- that's what happens with sleep() when a signal handler is called, but not when the handler raises an exception or the signal's default action is set (SIG_DFL) or it is ignored (SIG_IGN).

Yeah, that's what I don't understand either. 
It seems the cause of this weird behavior must be something within loop.add_signal_handler().
I tried to get rid of it and rolled in my own variation and I managed to have all workers exit cleanly:

import asyncio
import concurrent.futures
import functools
import multiprocessing
import signal
import sys
import time

loop = asyncio.get_event_loop()
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)

def long_running_fun():
    for x in range(5):
        print("loop %s" % x)
        time.sleep(1000)

@asyncio.coroutine
def infinite_loop():
    while True:
        fut = loop.run_in_executor(executor, long_running_fun)
        yield from asyncio.wait_for(fut, None)

def register_sig_handler(signals=[signal.SIGINT, signal.SIGTERM],
                         handler=None):
    sigmap = dict((k, v) for v, k in signal.__dict__.items()
                  if v.startswith('SIG'))
    if handler is None:
        def default_handler(sig, frame):
            this = multiprocessing.current_process()
            print("%r interrupted by %s signal; exiting cleanly now" % (
                  this, sigmap.get(sig, sig)))
            sys.exit(sig)
        handler = default_handler
    for sig in signals:
        signal.signal(sig, handler)

def main():
    asyncio.async(infinite_loop())
    register_sig_handler()
    loop.run_forever()

main()


Apparently the difference is that the handler passed to loop.add_signal_handler() gets called only for the main process (and it mysteriously suppresses time.sleep() exception) while this one is called for all the running workers + the main process:

$ python foo.py 
loop 0
^C<Process(Process-1, started)> interrupted by SIGINT signal; exiting cleanly now
<Process(Process-3, started)> interrupted by SIGINT signal; exiting cleanly now
<Process(Process-2, started)> interrupted by SIGINT signal; exiting cleanly now
<Process(Process-4, started)> interrupted by SIGINT signal; exiting cleanly now
<Process(Process-1, started)> interrupted by SIGTERM signal; exiting cleanly now
<_MainProcess(MainProcess, started)> interrupted by SIGINT signal; exiting cleanly now
$

Guido van Rossum

unread,
Mar 26, 2014, 2:35:00 PM3/26/14
to Giampaolo Rodola', python-tulip
Another thing to investigate might be how the executor creates the processes, and if anything happens to signals there. You might be able to print the pid in the subprocess and send it a signal and see how it is handled. Also read up on signals on fork.

Richard Oudkerk

unread,
Mar 26, 2014, 5:19:12 PM3/26/14
to python...@googlegroups.com
On 26/03/2014 5:25pm, Guido van Rossum wrote:
> - Looks like the signal handler is somehow inherited by the subprocess
> created for the process pool?

It looks like ProcessPoolExecutor creates its workers lazily, so after
you have changed the signal handlers for the main process. The workers
are created using multiprocessing, which uses fork on Unix (at least by
default), so they inherit the signal handlers.

You might try submitting a dummy task before modifying the signal
handlers (although that will not help much if the workers have to be
restarted).

In Python 3.4 you might try using

multiprocessing.set_start_method('spawn')

or

multiprocessing.set_start_method('forkserver')

This hopefully prevents the workers from inheriting anything troublesome
from the main process.

-- Richard

Giampaolo Rodola'

unread,
Mar 26, 2014, 7:50:11 PM3/26/14
to Richard Oudkerk, python...@googlegroups.com
Thanks for chiming in.
That did help raising an exception but not in the worker code from which it appears I cannot catch KeyboardInterrupt (actually I can't catch it from anywhere).
I get different exceptions every time, all generating form multiprocessing code.
Will keep digging further and hopefully update this thread in case I find a solution.

Giampaolo Rodola'

unread,
Mar 28, 2014, 7:02:00 PM3/28/14
to Guido van Rossum, python-tulip

On Wed, Mar 26, 2014 at 7:35 PM, Guido van Rossum <gu...@python.org> wrote:
Another thing to investigate might be how the executor creates the processes, and if anything happens to signals there.

After some further digging this seems to be related with the problem at hand.
According to this:
...it appears a feasible solution is to prevent workers to ever receive KeyboardInterrupt and have the main process shutdown the pool via pool.terminate() and finally pool.join().
Unfortunately concurrent.futures.ProcessPoolExecutor does not expose all the necessary bits to do that (multiprocessing.Pool(initializer=...) argument and terminate() method).

I also suspect that in order to emulate the suggested solution the underlying Process instance should be daemonized, similarly to what multiprocessing.Pool does:

I wonder whether concurrent.futures.ProcessPoolExecutor would be better off exposing more APIs in order to facilitate tasks like these.

Guido van Rossum

unread,
Mar 28, 2014, 7:05:07 PM3/28/14
to Giampaolo Rodola', python-tulip
You're going to have to move the discussion to python-dev or the python issue tracker then.
Reply all
Reply to author
Forward
0 new messages