Wait until a stream is closed?

1,046 views
Skip to first unread message

Victor Stinner

unread,
Oct 12, 2014, 7:29:30 AM10/12/14
to python-tulip
Hi,

I added 3 examples to show to to register an open sockets in asyncio
documentation:

add_register() with reader callback:
https://docs.python.org/dev/library/asyncio-eventloop.html#asyncio-watch-read-event

Protocol:
https://docs.python.org/dev/library/asyncio-protocol.html#asyncio-register-socket

Stream:
https://docs.python.org/dev/library/asyncio-stream.html#register-an-open-socket-to-wait-for-data-using-streams

I would like to ensure that all sockets are closed at the end of the
example. For streams, I call writer.close(), but I don't see how to
wait until the socket is closed. It doesn't look to be implemented
directly. It looks like currently, it's only possible to call read()
in a loop until read() returns an empty string:

@asyncio.coroutine
def wait_eof(reader):
while True:
# Don't call read() becaues it may allocate a large buffer
data = yield from reader.read(4096)
if data:
break

Is there another way to wait until the socket is closed? My wait_eof()
looks complex, I don't want to put it in a simple example.

Or maybe I should not wait until a socket is closed? Is it safe to
call writer.close() and exit immediatly? The scheduled callback which
will close the socket may not be called immediatly.

Victor

Victor Stinner

unread,
Oct 12, 2014, 3:05:25 PM10/12/14
to python-tulip
Hi again,

I also added examples of UDP echo client and echo server:
https://docs.python.org/dev/library/asyncio-protocol.html#udp-echo-client-protocol
https://docs.python.org/dev/library/asyncio-protocol.html#udp-echo-server-protocol

In the UDP echo server example, you have a more concrete example of
"waiting until the socket is closed". The create_datagram_endpoint()
returns a pair of (transport, protocol), whereas create_server()
returns a Server class. The Server class has a close() method but also
a wait_closed() method. A transport has a close() method but no
wait_closed() method.

In the UDP echo server example, tranport.close() is called while the
event loop is not running. The transport enters the status "closing",
but since the event loop is not run again, the socket is never closed
and a ResourceWarning is emitted.

Victor

Guido van Rossum

unread,
Oct 12, 2014, 7:55:27 PM10/12/14
to Victor Stinner, python-tulip
So you're only concerned about the streams example, right? I modified this so that wait_for_data() returns the 'rsock' object, which IIUC is what you are concerned about (since it is handed to the transport and implicitly closed by the writer.close() call). If I print this value right after the run_until_complete() call, rsock is closed. I walked through this with pdb, and I think that the following sequence of events happens:

- writer.close() calls self._transport.close(), which is _SelectorTransport.close()
- that removes the socket from the selector and schedules a call to _call_connection_lost
- wait_for_data() returns, causing the Task's result to be set, causing the Task's callbacks to be scheduled
- the loop goes for the next event, which is _call_connection_lost
- _call_connection_lost() calls StreamReader.connection_lost(), which does nothing interesting
- _call_connection_lost() then regains control and closes the socket (this is what we've been waiting for!!!)
- after this the task's callback runs, which calls loop.stop(), which causes the loop to stop, after any other callbacks that are still in the loop's _ready queue

IOW I think this is working, but you're right in wondering how it works (it wasn't easy to figure out). In a sense the key things are the way stop() is implemented (by adding a callback to the _ready queue that raises _StopError) and the way call_soon() promises to execute callbacks in the order they were registered. However the PEP doesn't promise quite this -- the call_soon() promise is solid, but there are weasel-words for stop() that allow it to exit from run_forever() before the _ready queue is empty.

I don't think you need to change the example, except you ought to decorate wait_for_data() with @asyncio.coroutine. It's also true that TCP doesn't guarantee that the recipient receives all three bytes in a single packet, so the single read() call may not see all bytes, but I think even if wait_for_data() closes the writer before everything has been read it will still take the same flow (close() says to throw away unread bytes).
--
--Guido van Rossum (python.org/~guido)

Guido van Rossum

unread,
Oct 12, 2014, 8:12:15 PM10/12/14
to Victor Stinner, python-tulip
The proper solution here would be to use a Future that is made completed when the protocol is told the connection is closed. I would add this line to connection_made() in the server:

        self.completed = asyncio.Future()

Then add a connection_lost() method like this:

    def connection_lost(self, exc):
        if exc is None:
            self.completed.set_result(None)
        else:
            self.completed.set_exception(exc)

In your main code, after catching KeyboardInterrupt, you can then use

transport.close()
loop.run_until_complete(protocol.completed)
loop.close()

NOTE: If an error happens to the transport the loop.close() call won't actually be reached; you can solve this in various ways, e.g. by not calling set_exception(), or using try/finally.

Victor Stinner

unread,
Oct 12, 2014, 10:37:50 PM10/12/14
to Guido van Rossum, python-tulip
Hi,

2014-10-12 21:55 GMT+02:00 Guido van Rossum <gu...@python.org>:
> So you're only concerned about the streams example, right?

Oh no sorry, my question was general. It's just that I discovered the
issue when working on these examples for the documentation.

> - writer.close() calls self._transport.close(), which is
> _SelectorTransport.close()
> - that removes the socket from the selector and schedules a call to
> _call_connection_lost

It only calls immedialty _call_connection_lost() if the write buffer is empty.

Right now, it's not clear for me what "close" means, and it's not well
documented. When I close a file descriptor, I except that future read
and write operations will fail. Usually, all operations are blocking,
so I don't have to worry of on-going operations.

In asyncio, it looks like closing a transport stops immediatly reading
but writing is still possible. So it's possible to write into a closed
transport. Is that correct? It looks like transport.write_eof() is the
way to block *future* write operations.

My question is: how can I ensure a connection is completly closed?
Buffer flushed, transport closed, etc.? Or said differently, what is
the safest way to close a connection controlled by a pair of (reader,
writer) streams?

Is it safer to call write_eof() before close()?

Transport.close() documentation says "buffered data will be flushed
asynchronously". Does it mean that close() must be followed by "yield
from writer.drain()"?
https://docs.python.org/dev/library/asyncio-protocol.html#asyncio.BaseTransport.close

(I'm still asking for the general case, for example when I don't
control all operations done on the reader nor the writer.)

Victor

Guido van Rossum

unread,
Oct 12, 2014, 11:45:41 PM10/12/14
to Victor Stinner, python-tulip
On Sun, Oct 12, 2014 at 3:37 PM, Victor Stinner <victor....@gmail.com> wrote:
2014-10-12 21:55 GMT+02:00 Guido van Rossum <gu...@python.org>:
> So you're only concerned about the streams example, right?

Oh no sorry, my question was general. It's just that I discovered the
issue when working on these examples for the documentation.

OK. Sorry for misunderstanding.
 
> - writer.close() calls self._transport.close(), which is
> _SelectorTransport.close()
> - that removes the socket from the selector and schedules a call to
> _call_connection_lost

It only calls immedialty _call_connection_lost() if the write buffer is empty.

Which is the case in the streams example. :-)
 
Right now, it's not clear for me what "close" means, and it's not well
documented. When I close a file descriptor, I except that future read
and write operations will fail. Usually, all operations are blocking,
so I don't have to worry of on-going operations.

Actually, closing a FD doesn't mean that future reads and writes will fail -- it means that the caller promises not to use that FD any more. The FD itself may be reused for other purposes. (It's different with IO objects, they typically maintain state to enforce this promise, e.g. by setting the FD to -1 or setting and testing an explicit closed flag.)

Also, when closing a socket, what actually happens depends on whether there are other processes that still have the socket open, or whether there are other FDs referencing the same underlying kernel data structures (e.g. using dup() or dup2()). The close() call is not actually synchronous AFAIK, but the kernel typically attempts to send any data it still has buffered, as long as the remote side doesn't refuse it. (That's AFAIK for TCP sockets; I'm not sure what happens with UDP sockets if there are outgoing packets still buffered in the kernel.)
 
In asyncio, it looks like closing a transport stops immediately reading

but writing is still possible. So it's possible to write into a closed
transport. Is that correct? It looks like transport.write_eof() is the
way to block *future* write operations.

That's not how it's supposed to be. You should never write to a transport after closing it.

A TCP connection can be thought of as a *pair* of pipes, one in each direction (incoming and outgoing). The Transport manages the reading end of the incoming pipe, and the writing end of the outgoing pipe. Calling write_eof() closes the writing end of the outgoing pipe (but leaving data that's still waiting to be transferred to the other end of that pipe in the buffers), while still allowing more data to flow through the incoming pipe; data_received() will be called for incoming data. Calling close() makes a similar promise about the outgoing pipe but forcefully closes the incoming pipe, promising data_received() will not be called any more. (It is possible that the process at the other end of the pipes keeps writing; in that case it will eventually get an error. But if all incoming data has been received on our end, the other process will receive all outgoing data that's still buffered or in transit.)

There's an additional wrinkle in that certain transports are unable to support write_eof() and eof_received() -- in particular this is the case for TLS. When using such a transport, you have to use application-level signalling to indicate the of the data -- for example, HTTP can use the Content-Length header for this purpose, but it can also use "Transfer-encoding: chunked" for the same purpose.
 
My question is: how can I ensure a connection is completely closed?

Buffer flushed, transport closed, etc.? Or said differently, what is
the safest way to close a connection controlled by a pair of (reader,
writer) streams?

In the latter case, call writer.close() after you have read all you want to read.

It looks like a problem with the examples is that they want to close the event loop, and there are situations where closing the event loop prevents some data from being sent (or received). But a similar thing can happen with sockets: I believe that if a process sends a large amount of data and then closes the socket, the close() call may return while the data is still in a kernel buffer. If at that point the host crashes, that data will never be seen by the recipient across the network. And this is why you must use Content-Length, or Transfer-Encoding: chunked, or some other application-level protocol that lets the receiving end determine from the data alone that it has received the final byte -- never rely on connection_lost() except to release resources.
 
Is it safer to call write_eof() before close()?

Yes. In fact you can't call write_eof() after close().
 
Transport.close() documentation says "buffered data will be flushed
asynchronously". Does it mean that close() must be followed by "yield
from writer.drain()"?
https://docs.python.org/dev/library/asyncio-protocol.html#asyncio.BaseTransport.close

No, that's not what it means. Beware, you're mixing levels here -- drain() only applies to stream writers, but the docs you are quoting are for Transports, which are a much lower-level abstraction.

The drain() call doesn't actually flush anything. It merely may block the calling coroutine until the Transport's internal write buffer has drained sufficiently (using two thresholds, see set_write_buffer_limits()), while letting other tasks and callbacks continue. But if the write buffer is not filled over the higher threshold, drain() won't block even though there are unwritten bytes. If you close the stream you merely promise that you won't be using the stream any more.

So what happens to the bytes you wrote? The stream doesn't have its own write buffer -- all the buffering is done by the Transport. Closing the stream calls close() on the Transport, but the transport doesn't then just throw away its buffers -- it still has the socket registered with the selector for writing, and it will attempt to write the buffered bytes to the socket whenever the write handler is called. Once the last byte has been written (as indicated by the send() return value), the transport (if it has been closed) will call the protocol's connection_lost() callback, with a None argument indicating all is well.

However, the kernel may still have the bytes buffered, and my explanation of kernel-level stuff above still applies. AFAIK even Twisted doesn't attempt to wait until the kernel has received an ACK from the remote host that the bytes have been received there -- you need an application-level protocol if you need such assurance. (For example, if you are using HTTP PUT or POST to mutate a remote resource, receiving the "200 OK" status assures you that the remote side has received your bytes.(*))

If you want a little more assurance that your bytes have been sent off to the network, without implementing an application-level protocol, you could call transport.set_write_buffer_limits(0, 0) and then call writer.drain(). That will block until the Transport's write buffer is empty. But the other caveats apply.
 
(I'm still asking for the general case, for example when I don't
control all operations done on the reader nor the writer.)

Well, at some point if you don't have control you can't make strong promises. The best you can do may be to explain how things work, which is what I have tried in this message.

__________
(*) Even then you may not be out of the woods. Disk controllers have their own buffers, which are hard to control even for kernel code. And calling fsync() for every POST or PUT request may reduce your server's performance to a crawl. And so on... So in the end all you are doing is controlling probabilities. Such is life.
Reply all
Reply to author
Forward
0 new messages