Re: [gevent] gevent + celery + subprocess + graylog = hang

1,002 views
Skip to first unread message

Denis Bilenko

unread,
Oct 30, 2012, 10:01:30 AM10/30/12
to gev...@googlegroups.com
On Mon, Oct 29, 2012 at 8:01 AM, Mark Hingston <mrkhi...@gmail.com> wrote:
> Hey guys,
>
> It seems that the combination of libraries that I'm using in my project has
> led me to experience a hang in gevent-based code. I'm using celery worker
> (http://celeryproject.org/) to handle video encoding tasks. The worker is
> using gevent as the concurrency backend (it uses a gevent threadpool) and
> it's connecting to Amazon SQS for it's underlying queue. I've also got
> graylog enabled. However with this setup I often get a lockup when I call
> something like:
>
> p = subprocess.Popen(['ffmpeg', ...], cwd='/my/dir/', stdin=subprocess.PIPE,
> stdout=subprocess.PIPE, stderr=subprocess.PIPE)
> p.wait()

Could it be that something else in your code messes with SIGCHLD or
waitpid()? That could causes a hang here.

Mark Hingston

unread,
Nov 1, 2012, 5:29:05 AM11/1/12
to gev...@googlegroups.com
Sorry - my original description was poor. The lockup actually occurs at the point when I call Popen. The call to Popen just never returns. It seems to stay waiting on the socket read operation on the errpipe_read.read() call:

errpipe_read = FileObject(errpipe_read, 'rb')
data = errpipe_read.read()


It seems as though it's never receiving a read event even though the exec completes correctly and the process that I'm calling runs and then even terminates.

Also FWIW I don't think that anything is messing with SIGCHLD or waitpid, not from what I could tell.

I had a bit of luck with applying the fix for issue 154: "Hang when calling `fork()` after `socket.connect()`" - I added the relevant code to kill() then join() on the thread pool around where subprocess does a fork and as far as I could tell this prevented the code from locking up, however I don't think that killing the threads is really a good solution for me here. Surprisingly celery still seems to work and be able to process many subsequent tasks that call subprocess. I've not really looked into how but I've assumed it must be recreating threads for the pool as it needs them or something like that.

Mark Hingston

unread,
Nov 1, 2012, 5:36:37 AM11/1/12
to gev...@googlegroups.com


On Thursday, November 1, 2012 8:29:05 PM UTC+11, Mark Hingston wrote:

I had a bit of luck with applying the fix for issue 154: "Hang when calling `fork()` after `socket.connect()`" - I added the relevant code to kill() then join() on the thread pool around where subprocess does a fork and as far as I could tell this prevented the code from locking up, however I don't think that killing the threads is really a good solution for me here. Surprisingly celery still seems to work and be able to process many subsequent tasks that call subprocess. I've not really looked into how but I've assumed it must be recreating threads for the pool as it needs them or something like that.

Scratch that - I have now been able to get it to lock up even with this change. Maybe it just changed timing to make the lockup less frequent. Not sure.

Denis Bilenko

unread,
Nov 1, 2012, 6:30:02 AM11/1/12
to gev...@googlegroups.com
On Thu, Nov 1, 2012 at 10:29 AM, Mark Hingston <mrkhi...@gmail.com> wrote:
> On Wednesday, October 31, 2012 1:01:53 AM UTC+11, Denis Bilenko wrote:
>>
>> On Mon, Oct 29, 2012 at 8:01 AM, Mark Hingston <mrkhi...@gmail.com> wrote:
>>
>> > p = subprocess.Popen(['ffmpeg', ...], cwd='/my/dir/',
>> > stdin=subprocess.PIPE,
>> > stdout=subprocess.PIPE, stderr=subprocess.PIPE)
>> > p.wait()
>>
>> Could it be that something else in your code messes with SIGCHLD or
>> waitpid()? That could causes a hang here.
>
>
> Sorry - my original description was poor. The lockup actually occurs at the
> point when I call Popen. The call to Popen just never returns. It seems to
> stay waiting on the socket read operation on the errpipe_read.read() call:
>
> errpipe_read = FileObject(errpipe_read, 'rb')
> data = errpipe_read.read()

weird, does wrapping this block with "with gevent.Timeout(5):" help?

Can you make a complete script that reproduces the look up?

Mark Hingston

unread,
Nov 2, 2012, 1:30:09 AM11/2/12
to gev...@googlegroups.com
Well, using a gevent timeout here definitely throws an exception when the timer expires, yes. Which is interesting but I don't think that will be very helpful to me - I'd like to check if the process started correctly, or at the very least it's output and return code. If Popen throws an error I won't have the process object to wait / poll
 

Can you make a complete script that reproduces the look up?

Tried all day but couldn't bring this down to a single script that locks up. Will try further when I return from holidays in a week.

It's worth pointing out that I'm definitely seeing the 'Mixing fork() and threads detected; memory leaked.' print whenever I call subprocess.Popen - sorry I didn't think to mention this before. Some of my other tasks are importing libraries that make socket connections to web services well before I attempt to call subprocess. When I remove the instantiation of objects from these libraries I don't seem to get that warning print. That's why I was looking into the socket related lockup i mentioned earlier.

Denis Bilenko

unread,
Nov 2, 2012, 6:22:58 AM11/2/12
to gev...@googlegroups.com
On Fri, Nov 2, 2012 at 6:30 AM, Mark Hingston <mrkhi...@gmail.com> wrote:
> It's worth pointing out that I'm definitely seeing the 'Mixing fork() and
> threads detected; memory leaked.' print whenever I call subprocess.Popen -
> sorry I didn't think to mention this before.

Actually, this warning is only relevant if you fork() Python process
and continue using it. In case of Popen, which does execv(), there's
nothing actually leaked that I know of.

I'll remove the warning. Still would like to have a way to reproduce
your problem.

vitaly

unread,
Nov 9, 2012, 7:26:25 PM11/9/12
to gev...@googlegroups.com
On Monday, October 29, 2012 12:01:45 AM UTC-7, Mark Hingston wrote:
I seem to be able to work around this issue by adding the following code change to the POSIX _execute_child function in gevent/subprocess.py:

--- a/gevent/subprocess.py
+++ b/gevent/subprocess.py
@@ -597,6 +597,14 @@ class Popen(object):
                            errread, errwrite):
             """Execute program (POSIX version)"""
 
+            saved_stdin = sys.stdin
+            saved_stdout = sys.stdout
+            saved_stderr = sys.stderr
+
+            sys.stdin = sys.__stdin__
+            sys.stdout = sys.__stdout__
+            sys.stderr = sys.__stderr__ 

So I'm wondering the patch above is a legit solution to the issue or not. ...

Hi Mark, although this workaround might work in your app, I would venture to say that it's **not** a legit solution to include in gevent.subprocess.Popen because a different user of gevent may have purposefully changed one or more of the sys.stdout/stderr instances to go somewhere else and would like gevent.subprocess.Popen to honor those choices (just like the builtin subprocess.Popen would) instead of overriding them with the default sys.__stdout__, etc.  And there may also be other posix threads in the user's app writing to sys.stdout/sys.stderr that would be impacted if gevent.subprocess.Popen temporarily patched sys.stdin/stdout/stderr.

Best,
Vitaly

Mark Hingston

unread,
Nov 20, 2012, 9:54:41 PM11/20/12
to gev...@googlegroups.com
Hey, thanks for the comments guys.


On Saturday, November 10, 2012 11:26:25 AM UTC+11, vitaly wrote:
Hi Mark, although this workaround might work in your app, I would venture to say that it's **not** a legit solution

I agree Vitaly - good points.



On Friday, November 2, 2012 9:23:22 PM UTC+11, Denis Bilenko wrote:
Still would like to have a way to reproduce your problem. 


I got some time to come back to this today - I'm now pretty confident that this is what's happening in the case of the Popen lockup:

 - process makes socket connections to SQS / graylog (seems that perhaps it's important here that there are multiple threads created through the use of these libraries)
 - fork / execv in subprocess.Popen
 - child writes to stdout/stderr, (e.g.:
    "WARNING: Mixing fork() and threads detected; memory leaked."
    or
    "Exception AttributeError: AttributeError("'_DummyThread' object has no attribute '_Thread__block'",) in <module 'threading' from '/usr/lib/python2.7/threading.pyc'> ignored")
 - celery redirects these outputs to the logger
 - graypy attempts to use a socket connection to send out a log, child process hangs
 - parent waits forever on child process

At this point, ps shows:

pid: 23310; parent-pid: 20888; cmd: [celeryd@mark-VirtualBox:MainProcess] -active- (worker --pool=gevent --loglevel=DEBUG)
pid: 23352; parent-pid: 23310; cmd: [celeryd@mark-VirtualBox:MainProcess] -active- (worker --pool=gevent --loglevel=DEBUG)

And if I kill process 23352, then celery frees up, finishes processing the task in question and goes on to process another task.

Further, if I suppress the two error messages mentioned above from being written to stdout/err in the child then no lockups occur.

So by this point I'm almost certain that the underlying issue I'm seeing is Issue 154. However the fix suggested in the comments for that issue is not really appropriate for me I don't think. I know I commented earlier that I had still seen a lockup with that fix applied, but it's such a hacky fix (killing all the threads that celery is actually using) that I didn't really want to investigate further what was causing a lockup in this case. So I'll be working around this issue by turing off celery's stdout/stderr redirection.



FWIW, I've attached a tarball that can be used to reproduce the problem. It requires you to run a celery worker with:

celery worker --pool=gevent --loglevel=DEBUG

You will also need an Amazon SQS login, with AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY set as environment variables.

You also need to run a graylog server on the localhost.

Running this on linux 12.04.1 saw the Popen lockup pretty frequently (when it happened it was always within the first 5-10 task invocations, and often on the first).


Thanks for the help!
Mark

Mark Hingston

unread,
Nov 20, 2012, 10:01:10 PM11/20/12
to gev...@googlegroups.com
Google groups was barfing when trying to add attachment with tar.gz extension, trying with .zip
mytest.zip

Denis Bilenko

unread,
Nov 23, 2012, 3:22:57 AM11/23/12
to gev...@googlegroups.com
On Wed, Nov 21, 2012 at 3:54 AM, Mark Hingston <mrkhi...@gmail.com> wrote:

> On Friday, November 2, 2012 9:23:22 PM UTC+11, Denis Bilenko wrote:
> I got some time to come back to this today - I'm now pretty confident that
> this is what's happening in the case of the Popen lockup:
>
> - process makes socket connections to SQS / graylog (seems that perhaps
> it's important here that there are multiple threads created through the use
> of these libraries)
> - fork / execv in subprocess.Popen
> - child writes to stdout/stderr, (e.g.:
> "WARNING: Mixing fork() and threads detected; memory leaked."
> or
> "Exception AttributeError: AttributeError("'_DummyThread' object has no
> attribute '_Thread__block'",) in <module 'threading' from
> '/usr/lib/python2.7/threading.pyc'> ignored")
> - celery redirects these outputs to the logger
> - graypy attempts to use a socket connection to send out a log, child
> process hangs
> - parent waits forever on child process

So, execvp/execvpe in subprocess.Popen do not succeed, right? (If they
did, errpipe_read would be closed because it has CLOEXEC flag.)

Could you avoid using thread pool and try again? (set
GEVENT_RESOLVER=ares to make sure dns resolver does not use the thread
pool).

> FWIW, I've attached a tarball that can be used to reproduce the problem. It
> requires you to run a celery worker with:
>
> celery worker --pool=gevent --loglevel=DEBUG
>
> You will also need an Amazon SQS login, with AWS_ACCESS_KEY_ID and
> AWS_SECRET_ACCESS_KEY set as environment variables.
> You also need to run a graylog server on the localhost.

Can you make a self-contained tarball that does not depend on external
services and includes all the dependencies and has a single script
that launches everything and reproduces the problem? Otherwise it's
not very useful.

BTW, what is your gevent version? 1.0rc1?

Mark Hingston

unread,
Nov 26, 2012, 3:13:36 AM11/26/12
to gev...@googlegroups.com
On Friday, November 23, 2012 7:23:19 PM UTC+11, Denis Bilenko wrote:
So, execvp/execvpe in subprocess.Popen do not succeed, right? (If they 
did, errpipe_read would be closed because it has CLOEXEC flag.)

Well from what I can tell the process gets stuck before it even tries to call exec, because my prints directly to sys.__stderr__ that were just before the exec were never printing and also the child process is always labelled as a 'celery' process when stuck. If the exec had occurred then I would have expected the stuck command to be 'touch' (or ffmpeg in my actual real world code).

 
Could you avoid using thread pool and try again? (set
GEVENT_RESOLVER=ares to make sure dns resolver does not use the thread
pool).

Thanks for the tip about this - that's an important factor here. We actually were already changing the resolver with:

gevent.hub.Hub.resolver_class = 'gevent.resolver_ares.Resolver'

but celery patches gevent and uses the resolver before we get a chance to change it with that line, so we end up using the threaded resolver. We'll change to use the environment variable to ensure that the right resolver is used from now on. FWIW, using this env var makes the lockup go away - without needing any of the other changes I mentioned earlier in my posts.

Previously I hadn't considered that our code should not have been using the threadpool in the first place. There's no other code actually using a threadpool for us.

 
Can you make a self-contained tarball that does not depend on external
services and includes all the dependencies and has a single script
that launches everything and reproduces the problem?

Ok - done and attached to this post. The attached script spawns a child process that should lock up when it attempts to log in the preexec function. I wasn't using a preexec function in my celery code so this isn't the way that the lockup was occurring for me (as I described above, mine happened because the 'WARNING: Mixing fork ...' print was redirected to logging which then did a socket call) but in the end it's the same lockup - on Ubuntu the parent process hangs waiting for the errpipe_read.read() call to return, and it never returns.

Sorry it took so much time to get there - I actually didn't realise that Popen.subprocess has a pre-exec function.


BTW, what is your gevent version? 1.0rc1?


Yep, gevent 1.0rc1



Also sorry if this is off topic but in working on trying to get a simple repro of this issue I tried some code that does something like:

def run():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("google.com", 80))
    s.close()

pool = ThreadPool(2)
pool.spawn(run)
pool.join()

But this code locks up immediately on ubuntu and on Mac OSX. I couldn't find much documentation on ThreadPool so I'm wondering if maybe I'm just using it in a way it was not intended for?? 
test.py
Reply all
Reply to author
Forward
0 new messages