Kill server task if client disconnects? (python)

86 views
Skip to first unread message

vbp...@gmail.com

unread,
Dec 17, 2018, 2:24:25 PM12/17/18
to grpc.io
I'm sure it's been answered before but I've searched for quite a while and not found anything, so apologies:

We're using python... we've got server tasks that can last quite a while (minutes) and chew up lots of CPU.  Right now we're using REST, and when/if the client disconnects before return, the task keeps running on the server side.  This is unfortunate; it's costly (since the server may be using for-pay services remotely, leaving the task running could cost the client) and vulnerable (a malicious client could just start and immediately disconnect hundreds of tasks and lock the server up for quite a while).

I was hoping that a move to GRPC, in addition to solving other problems, would provide a clean way to deal with this.  But it's not immediately obvious how to do so.  I could see maybe manually starting a thread/Future for the worker process and iterating sleeping until either the context is invalid or the thread/future returns, but I feel like that's manually hacking something that probably exists and I'm not understanding.  Maybe some sort of server interceptor?

How would it be best to handle this?  I'd like to handle both very long unary calls and streaming calls in the same manner.

Cheers,
Vic

robert engels

unread,
Dec 17, 2018, 2:27:39 PM12/17/18
to vbp...@gmail.com, grpc.io
You can do this if you use the streaming protocol - that is the only way I know to have any facilities to determine when a “client disconnects”.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/9e84949d-139c-43df-a09e-5d8cc79022be%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

vbp...@gmail.com

unread,
Dec 17, 2018, 2:32:22 PM12/17/18
to grpc.io
Good idea, but the problem I have with this (if I understand you right) is that some of the server tasks are just these big monolithic calls that sit there doing CPU-intensive work (sometimes in a third-party library; it's not trivial to change them to stream back progress reports or anything).  

So it feels like some way of running them in a separate thread and having an overseer method able to kill them if the client disconnects is the way to go.  We're already using a ThreadPoolExecutor to run worker threads so I feel like there's something that can be done on that side... just seems like this ought to be a Really Common Problem, so I'm surprised it's either not directly addressed or at least commonly answered.

robert engels

unread,
Dec 17, 2018, 2:35:41 PM12/17/18
to vbp...@gmail.com, grpc.io
You don’t have to - just use the future as described - if the stream is cancelled by the client - you can cancel the future - if the future completes you send the result back in the stream (if any) - you don’t have to keep sending messages as long as the keep alive is on.

vbp...@gmail.com

unread,
Dec 18, 2018, 1:17:09 PM12/18/18
to grpc.io
Hmm; I'm having some luck looking at the context, which quite happily changes from is_active() to not is_active() the instant I kill the waiting client.  So I thought I'd proceed with something like

while not my_future.done():
  if not context.is_active():
    my_future.cancel()

Terminating the worker thread/process is actually vexing me though!  I tried having a ThreadPoolExecutor to give me a future for the worker task, but you can't really cancel a future from a thread, it turns out (you can only cancel it if it hasn't started running; once it's started, it still goes to completion).  So I've tried having a separate ProcessPoolExecutor (maybe processes can be killed?) but that's not actually going so well either, as attempts to use that to generate futures results in some odd "Failed accept4: Invalid Argument" errors which I can't quite work through.

Most confusing.  I wonder if I'll need to subclass grpc.server or if my servicer can manually run a secondary process or some such.  

Still, surprising to me this isn't a solved problem built into GRPC.  I feel like I'm missing something really obvious.

Eric Gribkoff

unread,
Dec 18, 2018, 1:32:15 PM12/18/18
to vbp...@gmail.com, grpc.io
On Tue, Dec 18, 2018 at 10:17 AM <vbp...@gmail.com> wrote:
Hmm; I'm having some luck looking at the context, which quite happily changes from is_active() to not is_active() the instant I kill the waiting client.  So I thought I'd proceed with something like

while not my_future.done():
  if not context.is_active():
    my_future.cancel()


Consider using add_callback on the RpcContext instead, so you don't have to poll.
 
Terminating the worker thread/process is actually vexing me though!  I tried having a ThreadPoolExecutor to give me a future for the worker task, but you can't really cancel a future from a thread, it turns out (you can only cancel it if it hasn't started running; once it's started, it still goes to completion).  So I've tried having a separate ProcessPoolExecutor (maybe processes can be killed?) but that's not actually going so well either, as attempts to use that to generate futures results in some odd "Failed accept4: Invalid Argument" errors which I can't quite work through.


ProcessPoolExecutor will fork subprocesses, and gRPC servers (and many other multi-threaded libraries) are not compatible with this. There is some discussion around this in https://github.com/grpc/grpc/issues/16001. You could pre-fork (fork before creating the gRPC server), but I don't think this will help with your goal of cancelling long-running jobs. It's difficult to cleanly kill subprocesses, as they may be in the middle of an operation that you would really like to clean up gracefully.
 
Most confusing.  I wonder if I'll need to subclass grpc.server or if my servicer can manually run a secondary process or some such.  

Still, surprising to me this isn't a solved problem built into GRPC.  I feel like I'm missing something really obvious.


I wouldn't consider cancelling long running jobs spawned by your server as part of the functionality that gRPC is intended for - this is a task that can came up regardless of what server protocol you are using, and will arise often even on non-server applications. A standard approach for this in a multi-threaded environment would be setting a cancel boolean variable (e.g., in your gRPC servicer implementation) that your task (the long-running subroutine) periodically checks for to exit early. This should be compatible with ThreadPoolExecutor.

Thanks,

Eric
 

vbp...@gmail.com

unread,
Dec 18, 2018, 1:45:24 PM12/18/18
to grpc.io
Thanks, Eric.  That makes some degree of sense, although there are a few cases we still won't be able to deal with, I suspect (and we may have trouble later anyway... in some cases our server program has to shell out to run a separate program, and if that runs into the fork trouble and can't be supported by GRPC we may be stuck with a very clanky REST implementation).

Hmm, quite a pickle.  I can see I'll be playing with a bunch of toy problems for a bit before even considering doing a migration to GRPC.  Most disagreeable, but we'll see what we get.

Can grpc client stubs be used from within grpc servicers?  (imagining fracturing this whole thing into microservices even if that doesn't solve this particular problem).

Eric Gribkoff

unread,
Dec 18, 2018, 2:07:00 PM12/18/18
to vbp...@gmail.com, grpc.io
On Tue, Dec 18, 2018 at 10:45 AM <vbp...@gmail.com> wrote:
Thanks, Eric.  That makes some degree of sense, although there are a few cases we still won't be able to deal with, I suspect (and we may have trouble later anyway... in some cases our server program has to shell out to run a separate program, and if that runs into the fork trouble and can't be supported by GRPC we may be stuck with a very clanky REST implementation).


Sorry, I should have been more precise in my earlier response: you are fine to use fork+exec (e.g., subprocess.Popen) to run a separate program in a new shell. (Caveat: we had a bug that may cause problems even with fork+exec when using Python3. The fix is now merged and will be in the next release; our nightly builds will also include the fix ~tomorrow if you are hitting this issue). The issues on the server-side with fork arise when using libraries that fork and, rather than exec'ing a new program, continue to run the original program in the child process, e.g., Python's multiprocessing module.

 
Hmm, quite a pickle.  I can see I'll be playing with a bunch of toy problems for a bit before even considering doing a migration to GRPC.  Most disagreeable, but we'll see what we get.

Can grpc client stubs be used from within grpc servicers?  (imagining fracturing this whole thing into microservices even if that doesn't solve this particular problem).

Absolutely, and that's an intended/common usage.

Thanks,

Eric
 

vbp...@gmail.com

unread,
Dec 18, 2018, 3:51:23 PM12/18/18
to grpc.io
Ah; thanks--we're having to use subprocess.Popen in a few cases anyway.  I'll try that and see what we can do.  Thanks for the note on "grpc within grpc"; that may simplify some things too.

liburdi...@gmail.com

unread,
Feb 28, 2019, 7:32:07 PM2/28/19
to grpc.io
Out of curiosity, can anyone show an example of how add_callback can be used to interrupt the server-side process? I have the same problem as the OP for my application -- server-side can run for a very long time and if the client times out, then I need the server to cancel immediately. I've tried a variety of techniques, but I cannot get the callback function to stop the server-side call.

li...@google.com

unread,
Feb 28, 2019, 7:48:13 PM2/28/19
to grpc.io
I wrote an example about the "add_callback" API last December after reading this thread: https://github.com/grpc/grpc/pull/17551. But I haven't really push to merge that pull request. 
You can add your special logic in the server-side callback, like cancel the running job, log metrics, and other stuff.
Please take a look at the example, and let me know if it failed to solve your question.

Josh Liburdi

unread,
Feb 28, 2019, 8:14:18 PM2/28/19
to li...@google.com, grpc.io
That is a good example of using the callback! Where I get stuck is the first example you mentioned, cancelling the running job. A simulation of my problem would be to have the server perform a very long task (e.g. 5 minute sleep call); in those cases, I would need the callback to interrupt/cancel that sleep call. Usually I would handle this with signals and setting an explicit timer in the server process, but (from what I’ve seen and read) signals cannot be used in threads.

You received this message because you are subscribed to a topic in the Google Groups "grpc.io" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/grpc-io/3FdOOF7AK1g/unsubscribe.
To unsubscribe from this group and all its topics, send an email to grpc-io+u...@googlegroups.com.

To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.

li...@google.com

unread,
Feb 28, 2019, 8:30:49 PM2/28/19
to grpc.io
You question is beyond gRPC framework, and one cannot interrupt thread has been a headache for Python (and programming languages with multi-threading) for a long time.

Alternatively, you could:

1) Have the server thread instead of sleep for a complete 5 minute, you can break it down to like 1 second and check for termination flag. The termination flag can be flipped by other threads.
2) If the job you run can be ran with "subprocess", then it will be easier to control its life cycle.
3) Wrap your job with Python one of "Future" implementation.
4̶)̶ ̶I̶n̶v̶o̶k̶e̶ ̶C̶P̶y̶t̶h̶o̶n̶ ̶C̶ ̶A̶P̶I̶ ̶P̶y̶T̶h̶r̶e̶a̶d̶S̶t̶a̶t̶e̶_̶S̶e̶t̶A̶s̶y̶n̶c̶E̶x̶c̶

Josh Liburdi

unread,
Feb 28, 2019, 8:49:48 PM2/28/19
to li...@google.com, grpc.io
Agreed that my problem goes beyond the scope of gRPC, I was mostly curious if you had any creative ideas for handling this (and thanks for the ones you shared). The only thing, I think, gRPC could do to help in these cases is allow RPCs to be handled by processes and not threads. 

Lidi Zheng

unread,
Feb 28, 2019, 8:55:37 PM2/28/19
to Josh Liburdi, grpc.io
That's kind of unfortunate, the server-side fork support seems not happening soon. Here is the issue #16001 to track the progress of server-side fork support.
Please left a comment in that PR, if this feature is critical for you. Hopefully, its prioritization can be increased.

Vic Putz

unread,
Mar 1, 2019, 9:55:56 AM3/1/19
to Lidi Zheng, Josh Liburdi, grpc.io
Josh:

(I'm the OP) I actually wound up doing exactly that as proof of
concept; it worked, although we're holding off for the moment. We're
running under python, so what I did is make small command-line workers
that communicated with the GRPC server via stdin/stdout, and then
shoved everything the function call needed into a Python dict, pickled
it, and sent it across stdin as a protobuffer just containing bytes.
When the worker finished, it wrote the result back the same way, and
if the GRPC context went invalid while the worker subprocess was
going, the server could just kill the subprocess.

If you're thinking "wow, that's an ugly hack"... well, you're right
(among other things, your subprocess worker can't write to stdout,
which means making sure nothing else gets printed to stdout, logs go
to stderr and a bunch of other fiddly bits). But it DOES work.

I think otherwise what you have to resort to is shipping the GRPC
context into your worker loop and checking it periodically. I can't
think of a nicer way (and I didn't want to thread GRPC state through
all the rest of our computational code).
> To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/CAMC1%3DjeXtW%3DOWE1cMUKKRXQG0Ot4zcgkZGe8Ed%2BbqfJHACsu2Q%40mail.gmail.com.

Josh Liburdi

unread,
Mar 1, 2019, 10:06:45 AM3/1/19
to vbp...@gmail.com, Lidi Zheng, grpc.io
Thanks for writing back, that is a gnarly solution. My proof-of-concept application does something similar today (checks the context periodically to validate it is still active), the problem I have is that sometimes a greedy bit of code will hang the entire application for too long in-between context checks (hence the need for signals and timeouts). My production application uses ZeroMQ and protobuf, so I may stick with that for the time being until gRPC adds support for processes. If you're not happy with your implementation of gRPC, then you might consider ZeroMQ -- it has a longer knowledge onboarding process but is more flexible.

Josh Liburdi

unread,
Mar 1, 2019, 12:25:45 PM3/1/19
to Vic Putz, Lidi Zheng, grpc.io
In case it gives anyone else additional ideas, I may have found a design pattern that can run a single instance of the Python gRPC server, any number of child processes, and allows you to signal/timeout those processes. Here's a high-level example:



class RandomServicer(random_pb2_grpc.RandomServicer):
    def __init__(self, pool):
        self.pool = pool

    def HandleData(self, request, context):
        response = random_pb2.Response()
        result = self.pool.apply_async(long_task, [args...])
        response = result.get() # overwrite or append to the response, blocks forever so be sure to use a signal _inside_ the function to return _something_
        return response


def long_task(args...):
        try:
                with interruptingcow.timeout(5, RuntimeError): # signal/timeout however you like, interruptingcow will work out of the box
                        <do your long task here!> 
                        return <success> # return processed value
        except RuntimeError:
                return <timeout> # return some default, timeout value

main():
        pool = multiprocessing.Pool(processes=2) # set process count
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=2), # worker count should match process count (?)
                             maximum_concurrent_rpcs=None)
        servicer = RandomServicer(pool)
        <start your server here and handle clean up>

Reply all
Reply to author
Forward
0 new messages