Usefulness of pause_writing(), resume_writing(), discard_output()

325 views
Skip to first unread message

Guido van Rossum

unread,
Sep 29, 2013, 12:48:50 PM9/29/13
to jeff quast, python-tulip
I've been thinking about flow control for write() (http://code.google.com/p/tulip/issues/detail?id=51).

While investigating this I found pause_writing(), resume_writing(), and discard_output(). These were contributed by Jeff Quast following his experience implementing a telnet client using Tulip (https://groups.google.com/forum/#!searchin/python-tulip/telnet/python-tulip/TpZWwXUh2zg/jIq5h0aMUNwJ).

I am having second thoughts about these APIs because they don't seem to be all that useful for most protocols. Even for Telnet they seem to be mostly advisory -- discard_output(), for example, cannot do anything about data that has already been sent to the socket, and pause_writing() can mostly be implemented by the protocol by simply not calling write().

I'm considering cutting these from the spec because I want the spec to be as lean as possible (but no leaner!) and I still need to add a new API for flow control in the other direction -- the transport needs to be able to tell the protocol to stop calling write() because its buffer is getting full and not draining fast enough.

I'm actually considering to name the new API Protocol.pause_writing() to distinguish it from Transport.pause() -- but that would be just as confusing as long as Transport.pause_writing() exists.

Ironically the two are actually used for (roughly) the same purpose -- it's just that Telnet, unlike most other protocols I know, has flow control built into the protocol itself, so that when the server is getting overwhelmed, it sends an XOFF control character, which must be interpreted by the client, which (in Jeff's implementation) calls transport.pause_writing().

But for most other TCP-based protocols, flow control is implicit at the TCP layer -- the way this information bubbles up to the writer is simply by the socket not accepting new non-blocking send() calls, which results in data being buffered in the transport. When the transport's buffer fills up over a certain high-water mark, the transport will call protocol.pause_writing() (once implemented) and the protocol is expected to stop passing data to transport.write(). (There are more layers here to consider -- e.g. if the app uses a StreamReader, that reader may have to implement pausing  somehow.)

I'm sending this message mostly to get a feeling of how important pause_writing() is -- I may be missing a use case, or Telnet may be an important protocol to be able to support in this fashion. Flow control is often the last thing developers consider, nobody really wants to think about it, but sometimes it means the difference between a great and a merely adequate implementation of a protocol, so I want to make sure that it is available when needed. (For more background information, search for Jim Gettys' rant about big buffers killing performance.)

--
--Guido van Rossum (python.org/~guido)

jeff quast

unread,
Sep 29, 2013, 10:47:40 PM9/29/13
to python-tulip
On Sep 29, 2013, at 11:48 AM, Guido van Rossum <gu...@python.org> wrote:

> I've been thinking about flow control for write() (http://code.google.com/p/tulip/issues/detail?id=51).
>
> While investigating this I found pause_writing(), resume_writing(), and discard_output(). These were contributed by Jeff Quast following his experience implementing a telnet client using Tulip

(... *server* is complete, but client is not.

Bumped into some difficulty using tulip on sys.stdin, sadly. I'm hoping to see it through before too long.. )

> I am having second thoughts about these APIs because they don't seem to be all that useful for most protocols. Even for Telnet they seem to be mostly advisory -- discard_output(), for example, cannot do anything about data that has already been sent to the socket,

discard_output() is not useful to me in its current form: It is necessary to discard_output() at a very specific place in the stream, and not within the middle of an Is-A-Command (IAC) sequence. This is not possible in the current "write it and forget it" calls to write().

> I'm sending this message mostly to get a feeling of how important pause_writing() is -- I may be missing a use case, or Telnet may be an important protocol to be able to support in this fashion.

I'm enjoying pause and resume_writing(). I considered it during the time of implementation and thought that new protocol authors may need it.. however, these authors may also implement features around it, without even a good reason, just that it was found in the API and thought they should have at it.

For that purpose, and the brevity of the documentation, I vote for killing it all then, unless other folk have found use for it.

Saúl Ibarra Corretgé

unread,
Sep 30, 2013, 3:29:24 AM9/30/13
to python...@googlegroups.com
Maybe we could kill them and provide some means for protocol writers to
implement them at the protocol level. Here is a (rough) idea that might
work: the transport could expose the number of queued bytes, so a
protocol could check how big the queue is before calling write(), and
write() could take a callback (or maybe call a function in the protocol)
when writing has been completed. That way the protocol could control
when to (or not to) call write.

--
Sa�l Ibarra Corretg�
http://bettercallsaghul.com

Antoine Pitrou

unread,
Sep 30, 2013, 4:28:53 AM9/30/13
to python...@googlegroups.com
Le Sun, 29 Sep 2013 09:48:50 -0700,
Guido van Rossum <gu...@python.org> a
écrit :
>
> But for most other TCP-based protocols, flow control is implicit at
> the TCP layer -- the way this information bubbles up to the writer is
> simply by the socket not accepting new non-blocking send() calls,
> which results in data being buffered in the transport. When the
> transport's buffer fills up over a certain high-water mark, the
> transport will call protocol.pause_writing() (once implemented) and
> the protocol is expected to stop passing data to transport.write().

If it's just a hint given to the protocol, perhaps it should be a
notification API rather than something giving an order? The API could
also be made flexible, e.g.:

protocol.flow_control_event(kind, amount)

where:
- kind = FlowControl.OUTPUT_BUFFER_FILLING or
FlowControl.OUTPUT_BUFFER_DRAINING
- amount = number of bytes currently waiting in buffer

> (There are more layers here to consider -- e.g. if the
> app uses a StreamReader, that reader may have to implement pausing
> somehow.)

How is the reader related to output flow control?
Incoming bytes have to be buffered no matter how. Whether they are
buffered in the lower or upper layers of the software stack, does it
make a difference?

> I'm sending this message mostly to get a feeling of how important
> pause_writing() is -- I may be missing a use case, or Telnet may be an
> important protocol to be able to support in this fashion. Flow
> control is often the last thing developers consider, nobody really
> wants to think about it, but sometimes it means the difference
> between a great and a merely adequate implementation of a protocol,
> so I want to make sure that it is available when needed. (For more
> background information, search for Jim Gettys' rant about big buffers
> killing performance.)

I haven't ever needed it myself, but I've never implemented a protocol
streaming large amounts of data.

Regards

Antoine.


Geert Jansen

unread,
Sep 30, 2013, 5:02:36 AM9/30/13
to Saúl Ibarra Corretgé, python-tulip
On Mon, Sep 30, 2013 at 9:29 AM, Saúl Ibarra Corretgé <sag...@gmail.com> wrote:

Maybe we could kill them and provide some means for protocol writers to implement them at the protocol level. Here is a (rough) idea that might work: the transport could expose the number of queued bytes, so a protocol could check how big the queue is before calling write(), and write() could take a callback (or maybe call a function in the protocol) when writing has been completed. That way the protocol could control when to (or not to) call write.

+1 When implementing (read or write) flow control in a protocol, an important input is the number of bytes outstanding in the transport (in addition there are protocols specific details about e.g. chunk sizes can be taken into account. For example, in HTTP it doesn't make sense to pause sending 1 byte before you send a complete chunk. This how write flow control works in gruvi (gruvi.readthedocs.org))

Regarding pause_writing() / resume_writing(): That actually reminded me of TCP_CORK, which is flow control in a sense but I'm not sure if the API was create to cover this.

Regards,
Geert

Guido van Rossum

unread,
Sep 30, 2013, 12:17:50 PM9/30/13
to Antoine Pitrou, python-tulip
On Mon, Sep 30, 2013 at 1:28 AM, Antoine Pitrou <soli...@pitrou.net> wrote:
Le Sun, 29 Sep 2013 09:48:50 -0700, Guido van Rossum <gu...@python.org> a écrit:
> But for most other TCP-based protocols, flow control is implicit at
> the TCP layer -- the way this information bubbles up to the writer is
> simply by the socket not accepting new non-blocking send() calls,
> which results in data being buffered in the transport. When the
> transport's buffer fills up over a certain high-water mark, the
> transport will call protocol.pause_writing() (once implemented) and
> the protocol is expected to stop passing data to transport.write().

If it's just a hint given to the protocol, perhaps it should be a
notification API rather than something giving an order? The API could
also be made flexible, e.g.:

  protocol.flow_control_event(kind, amount)

where:
- kind = FlowControl.OUTPUT_BUFFER_FILLING or
  FlowControl.OUTPUT_BUFFER_DRAINING
- amount = number of bytes currently waiting in buffer

This is a nice extension of the API Glyph first proposed; I've mentioned it in http://code.google.com/p/tulip/issues/detail?id=51
(TL;DR: whenever some send() call (or equivalent) completes and the buffer becomes shorter, the transport calls a method on the protocol tentatively named Protocol.write_buffer_drained(bytes_remaining). It's also a concrete version of what Saul just proposed.

You address my big objection to Glyph's proposal: he requires the protocol to keep track of how many bytes it has written in order to calculate the buffer size. He countered that the protocol is in complete control of counting bytes already (which is technically true) and that he'd rather not make redundant calls. But I still feel that in many cases it's difficult for the app to keep track of bytes written through various abstraction layers -- even though I agree that it's regrettable that redundant call might require a cycle through the event loop if it is to be treated like other Protocol methods.

I'm not super keen on having a single API with an enum indicating the direction, although I could imaging that it's easy to just ignore the 'kind' flag and just decide to pause or resume based on the amount.

My main concern here is that what most apps *really* wants is a way to block until the buffer is drained sufficiently (though not all the way to zero). I suppose this could be done by having the flow control callback keep track of a Future that gets created when the buffer is full and completes when it has drained to a certain level. So my counter-proposal to that is, why not have the transport manage that Future?

Here's a quick sketch of what a Transport could implement:

- An instance variable _drain which is either None or a Future, initialized to None.

- An instance variable _min_bufsize.

- A method drain() which accesses the _drain variable, as follows:

def drain(self, max_limit=1000000, min_limit=10000):  # Arbitrary limits
    assert self._drain is None
    if len(self._buffer) >= limit:
        self._drain = Future()
        self._min_bufsize = min_limit
        return self._drain
    else:
        return None  # Or a dummy Future that's already done?

- When some bytes are removed from the buffer (e.g. an async send() completes):

    if len(self._buffer) <= self._bufsize and self._drain is not None:
        d = self._drain
        self._drain = None
        if not d.cancelled():
            d.set_result(None)

Note that when drain() returns a Future, its caller should wait for that Future to complete before calling it again.

A flow-contol-conscious app could work like this:

    t.write(blah_blah)
    yield from t.drain() or []

The "or []" is a trick that turns the yield-from into a no-op when t.drain() returns None; this is less elegant but more efficient than returning a dummy Future. (Though much more efficient, since yield-from already takes a shortcut when the Future is completed on entry, and does not require a callback or an event loop cycle.)
 
> (There are more layers here to consider -- e.g. if the
> app uses a StreamReader, that reader may have to implement pausing
> somehow.)

How is the reader related to output flow control?

If an app sits in a loop reading some data from a reader, transforming it, and writing it to the transport, when the transport buffer becomes full, if it can convince the reader to wait the next time the app tries to read from it, flow control comes for free (the app doesn't have to write a line of code to get it).
 
Incoming bytes have to be buffered no matter how. Whether they are
buffered in the lower or upper layers of the software stack, does it
make a difference?

Well, that's a different issue. When the reader's buffer fills up it should unregister its FD (by calling pause() on the transport) so that the remote end may be informed to pause sending, until the reader's buffer drains sufficiently.
 
> I'm sending this message mostly to get a feeling of how important
> pause_writing() is -- I may be missing a use case, or Telnet may be an
> important protocol to be able to support in this fashion. Flow
> control is often the last thing developers consider, nobody really
> wants to think about it, but sometimes it means the difference
> between a great and a merely adequate implementation of a protocol,
> so I want to make sure that it is available when needed. (For more
> background information, search for Jim Gettys' rant about big buffers
> killing performance.)

I haven't ever needed it myself, but I've never implemented a protocol
streaming large amounts of data.

Well, what about an HTTP server that serves up large content (e.g. videos)?

Guido van Rossum

unread,
Sep 30, 2013, 12:28:17 PM9/30/13
to Geert Jansen, Saúl Ibarra Corretgé, python-tulip
On Mon, Sep 30, 2013 at 2:02 AM, Geert Jansen <gee...@gmail.com> wrote:

On Mon, Sep 30, 2013 at 9:29 AM, Saúl Ibarra Corretgé <sag...@gmail.com> wrote:

Maybe we could kill them and provide some means for protocol writers to implement them at the protocol level. Here is a (rough) idea that might work: the transport could expose the number of queued bytes, so a protocol could check how big the queue is before calling write(), and write() could take a callback (or maybe call a function in the protocol) when writing has been completed. That way the protocol could control when to (or not to) call write.

+1 When implementing (read or write) flow control in a protocol, an important input is the number of bytes outstanding in the transport

The drain() API that I proposed in my last message takes this into account.
 
(in addition there are protocols specific details about e.g. chunk sizes can be taken into account. For example, in HTTP it doesn't make sense to pause sending 1 byte before you send a complete chunk. This how write flow control works in gruvi (gruvi.readthedocs.org))

Can't quite follow what you're saying here, and the gruvi docs are too big to find the docs for its flow control API. Can you provide more details?
 
Regarding pause_writing() / resume_writing(): That actually reminded me of TCP_CORK, which is flow control in a sense but I'm not sure if the API was create to cover this.

It wasn't -- it was created for Telnet. But TCP_CORK looks like some fun optimization that someone could hack into Tulip's socket transport for systems that support it.

Gustavo Carneiro

unread,
Sep 30, 2013, 12:32:06 PM9/30/13
to Guido van Rossum, Antoine Pitrou, python-tulip
In that case, why not:

    yield from t.write(blah_blah) or []

Which is exactly what I proposed a couple of months ago, minus the "or []" trick.  The only reason I didn't insist on this more was because of your claims that futures are expensive, and I didn't think of this trick.  Since making tulip slower is not my intention, I gave up on that idea.

In fact, Transport.write() could already return [] instead of None in case there is buffer space to write more, so that the code can be reduced to:

    yield from t.write(blah_blah)

This is similar to socket or file write, and therefore it will be easier to learn.  When designing an API, it is always useful to have it similar to an existing familiar API.  The same reason we use familiar terminology, or the same design patterns.  Making an API easy to learn matters a lot IMHO.

Regards,

--
Gustavo J. A. M. Carneiro
Gambit Research LLC
"The universe is always one step beyond logic." -- Frank Herbert

Saúl Ibarra Corretgé

unread,
Sep 30, 2013, 1:00:20 PM9/30/13
to python...@googlegroups.com

>
> In that case, why not:
>
> yield from t.write(blah_blah) or []
>
> Which is exactly what I proposed a couple of months ago, minus the "or
> []" trick. The only reason I didn't insist on this more was because of
> your claims that futures are expensive, and I didn't think of this
> trick. Since making tulip slower is not my intention, I gave up on that
> idea.
>

Hum, you mean effectively daining the buffer after each write? That's
probably not what everyone wants. Data can be buffered just fine, until
a limit is reached and then the buffer can be drained.


--
Saúl Ibarra Corretgé
http://bettercallsaghul.com

Guido van Rossum

unread,
Sep 30, 2013, 1:00:20 PM9/30/13
to Gustavo Carneiro, Antoine Pitrou, python-tulip
Actually I see my proposal as the direct descendant of yours. Some reasons to make it a separate API:

- Many apps will never write enough to have to bother with flow control.

- Even for apps that do, they may not want to wait after *every* write().

- In fact it's probably a bad idea for most apps to wait after every write()! It might set the expectation that write() waits until the buffer is zero, or even until the data has been acknowledged by the remote side. After reading up on TCP_CORK and Nagle's algorithm I really don't want app writers to think that way.

- I don't want to force every app to wait for the return of write(). Making sure that if the Future is ignored things go right is a little tricky (maybe I'd need to use a weakref, which is fraught with worries). An explicit, separate drain() makes things unambiguous. And if you really want a write_and_drain() call, it's two lines of code away. :-)

- I want an API to set the buffer high- and low-water marks but I don't want these just to be options to write(), because then every abstraction built on top of write() would have to pass these arguments through. (And I kind of like having these parameters specified to the drain() call rather than being transport attributes set with a separate call.)



In fact, Transport.write() could already return [] instead of None in case there is buffer space to write more, so that the code can be reduced to:

    yield from t.write(blah_blah)

This is similar to socket or file write, and therefore it will be easier to learn.  When designing an API, it is always useful to have it similar to an existing familiar API.  The same reason we use familiar terminology, or the same design patterns.  Making an API easy to learn matters a lot IMHO.

I know. But I think my arguments for a separate API are pretty convincing, otherwise I would have gone with your idea before.

Saúl Ibarra Corretgé

unread,
Sep 30, 2013, 1:05:04 PM9/30/13
to python...@googlegroups.com

[snip]
How about making write() return a boolean? If it returns False then the
caller is advised to call drain because he reached the max buffer size
(which can be set in __init__).

if not t.write('foo'):
# ouch, buffer is full, lets wait
yield from t.drain()

Inside drain() we'd aseert that len(self._buffer) >= limit.

[snip]

Guido van Rossum

unread,
Sep 30, 2013, 1:05:24 PM9/30/13
to Saúl Ibarra Corretgé, python-tulip
On Mon, Sep 30, 2013 at 10:00 AM, Saúl Ibarra Corretgé <sag...@gmail.com> wrote:


In that case, why not:

     yield from t.write(blah_blah) or []

Which is exactly what I proposed a couple of months ago, minus the "or
[]" trick.  The only reason I didn't insist on this more was because of
your claims that futures are expensive, and I didn't think of this
trick.  Since making tulip slower is not my intention, I gave up on that
idea.


Hum, you mean effectively draining the buffer after each write? That's probably not what everyone wants. Data can be buffered just fine, until a limit is reached and then the buffer can be drained.

To Gustavo's defense, the Future returned by write() could work the same way as my proposed drain() -- it would not block until the buffer is completely empty, only until it is below a given limit; and it would not block at all if it there was still room in the buffer below the high-water mark.

Geert Jansen

unread,
Sep 30, 2013, 1:07:39 PM9/30/13
to Guido van Rossum, Saúl Ibarra Corretgé, python-tulip
On Mon, Sep 30, 2013 at 6:28 PM, Guido van Rossum <gu...@python.org> wrote:
 
(in addition there are protocols specific details about e.g. chunk sizes can be taken into account. For example, in HTTP it doesn't make sense to pause sending 1 byte before you send a complete chunk. This how write flow control works in gruvi (gruvi.readthedocs.org))

Can't quite follow what you're saying here, and the gruvi docs are too big to find the docs for its flow control API. Can you provide more details?

What I meant is that the ultimate decision whether to pause sending should be with the protocol, not the transport. My comment wan't really related to pause_writing() / resume_writing(), sorry. (It was related to a failed attempt of myself to do flow control in the transport by pausing the co-routine running the protocol. It was a bad approach....)

Gruvi's transport's have exactly the same flow API as describe here (it's essentially the libuv flow control because Gruvi's transports are pyuv Handle's):

 - Transports have a pause() / resume() method that a protocol can use to implement flow control in the read direction. (They are called start_read() / stop_read() but they do the same thing).

 - A transport allows a protocol to deduce how many outstanding bytes there are, so that it can implement flow control in the write direction. This is actually implemented as a callback that fires when a write() operation on the transport completes. So in order to know the outstanding bytes, a protocol needs to register this callback and do the bookkeeping. A Protocol.write_buffer_drained() would basically be the same thing.

I am a bit out of touch on recent Tulip developments, but I would prefer the Protocol.write_buffer_drained() callback over the Transport.drain() mechanism. I think the drain() method would be the first method on a Transport returning a Future? Also it would be a bit asymmetrical because in a protocol you *do* keep track of the size of your read buffer, for read flow control. It is maybe more consistent if you can implement your write flow control in the same way too.

Regards,
Geert

Guido van Rossum

unread,
Sep 30, 2013, 1:10:17 PM9/30/13
to Saúl Ibarra Corretgé, python-tulip
On Mon, Sep 30, 2013 at 10:05 AM, Saúl Ibarra Corretgé <sag...@gmail.com> wrote:
How about making write() return a boolean? If it returns False then the caller is advised to call drain because he reached the max buffer size (which can be set in __init__).

if not t.write('foo'):
    # ouch, buffer is full, lets wait
    yield from t.drain()

Less bad, but I still can't say I like it. First of all, the sense of the boolean is a bit arbitrary -- does it mean full or not-full? It's easy to make a mistake. It is also confusing with the "traditional" return value for write(), which is the number of bytes actually written. I think code that considers waiting should just be written

    t.write(b'foo')
    yield from t.drain()

That's shorter than your version :-)
 
Inside drain() we'd assert that len(self._buffer) >= limit.

 That sounds like a bad idea -- it would require any abstractions that wrap write() to pass this boolean back up. If the buffer is not full, drain() can return None or a dummy Future.

Gustavo Carneiro

unread,
Sep 30, 2013, 1:21:15 PM9/30/13
to Guido van Rossum, Saúl Ibarra Corretgé, python-tulip
Right, it seems I wasn't completely clear.  In the write() system call, with blocking file descriptors, if you write 100 bytes but there is 1K of buffer space, then the write call returns immediately and there is no wait.  If you write 1K bytes but there is only 100 bytes of buffer space, the first 100 bytes are written immediately and the process is suspended until there is buffer space to accept the remaining 924 bytes.

The same could happen with Transport.write(): if you try to write and there is space in the buffer, it would return [] so that yield from does not wait.  If there is no space in the buffer, Transport.write() would return a future that would complete when the buffer becomes free enough to accept the remaining data provided by Transport.write.

Note the subtle difference, that the future does not complete when the buffer is completely drained, it completes when the buffer is partially drained, enough to accept the data provided in the write.

The translation from write() unix system call concepts into tulip Transport.write is straightforward.  At least to me.  Again, the main advantage is not really one less call to make, it is mainly the familiarity of known APIs.  To me, that is golden.

And I do not think it is correct to allow apps to be blissfully unaware of flow control issues.  If you write data and don't care about the result either two things have to happen a) you have infinite amount of buffering, or b) some of the data will be lost due to loss of buffer space and your app is so badly written that won't even notice.

Regards.

Guido van Rossum

unread,
Sep 30, 2013, 1:27:10 PM9/30/13
to Geert Jansen, Saúl Ibarra Corretgé, python-tulip
On Mon, Sep 30, 2013 at 10:07 AM, Geert Jansen <gee...@gmail.com> wrote:

On Mon, Sep 30, 2013 at 6:28 PM, Guido van Rossum <gu...@python.org> wrote:
 
(in addition there are protocols specific details about e.g. chunk sizes can be taken into account. For example, in HTTP it doesn't make sense to pause sending 1 byte before you send a complete chunk. This how write flow control works in gruvi (gruvi.readthedocs.org))

Can't quite follow what you're saying here, and the gruvi docs are too big to find the docs for its flow control API. Can you provide more details?

What I meant is that the ultimate decision whether to pause sending should be with the protocol, not the transport. My comment wan't really related to pause_writing() / resume_writing(), sorry. (It was related to a failed attempt of myself to do flow control in the transport by pausing the co-routine running the protocol. It was a bad approach....)

OK.
 
Gruvi's transport's have exactly the same flow API as describe here (it's essentially the libuv flow control because Gruvi's transports are pyuv Handle's):

 - Transports have a pause() / resume() method that a protocol can use to implement flow control in the read direction. (They are called start_read() / stop_read() but they do the same thing).

 - A transport allows a protocol to deduce how many outstanding bytes there are, so that it can implement flow control in the write direction. This is actually implemented as a callback that fires when a write() operation on the transport completes. So in order to know the outstanding bytes, a protocol needs to register this callback and do the bookkeeping. A Protocol.write_buffer_drained() would basically be the same thing.

Interesting. I'm curious if you reached this API independently or somehow had read about Glyph's isomorphic proposal. I'm also interested in how hard in your experience proper bookkeeping on the protocol side is.

I am a bit out of touch on recent Tulip developments, but I would prefer the Protocol.write_buffer_drained() callback over the Transport.drain() mechanism. I think the drain() method would be the first method on a Transport returning a Future?

This is true -- I hadn't thought of this as a big barrier, but maybe Antoine (who slightly prefers callbacks) would agree with you. (To me, Future is one add_done_callback() call away from a callback.)
 
Also it would be a bit asymmetrical because in a protocol you *do* keep track of the size of your read buffer, for read flow control. It is maybe more consistent if you can implement your write flow control in the same way too.

I'm not sure that an appeal to symmetry gets us very far here -- reading and writing are already very different.

The appeal to "purity" (i.e. no Futures in the Transport API) might be a stronger argument for a different API. Let's think of how a Protocol could implement something similar to drain(). The "resume" event would be easily implemented by the write_buffer_drained() callback, so that part is easy. The "pause" decision would need a way to ask the Transport for its current buffer size -- I can't get myself to do double bookkeeping in the Protocol just to keep track of a number that the Transport already knows -- especially if the same number *is* reported by the write_buffer_drained() callback. This I could then live with:

- Transport.get_buffer_size() reports the current buffer size.

- Protocol.write_buffer_drained(new_buffer_size) is called by the Transport whenever some bytes have been fully transferred from the write buffer to the kernel (or whatever is the next destination of the data).

Still, a concern for this approach would be that the write_buffer_drained() call would require an event loop cycle to call it (we do this for all Protocol calls from the transport, and we have good reasons) and this would be a total waste if the Protocol is uninterested. Is that too big a price to pay for callback purity? Also it would require different Protocols to implement the same (somewhat subtle) logic. All in all I'm torn.

Antoine Pitrou

unread,
Sep 30, 2013, 2:28:48 PM9/30/13
to python...@googlegroups.com
On Mon, 30 Sep 2013 09:17:50 -0700
Guido van Rossum <gu...@python.org> wrote:
> On Mon, Sep 30, 2013 at 1:28 AM, Antoine Pitrou <solipsis-xNDA5W...@public.gmane.org> wrote:
>
> > Le Sun, 29 Sep 2013 09:48:50 -0700, Guido van Rossum <guido-+ZN9ApsX...@public.gmane.org> a
> > écrit:
> > > But for most other TCP-based protocols, flow control is implicit at
> > > the TCP layer -- the way this information bubbles up to the writer is
> > > simply by the socket not accepting new non-blocking send() calls,
> > > which results in data being buffered in the transport. When the
> > > transport's buffer fills up over a certain high-water mark, the
> > > transport will call protocol.pause_writing() (once implemented) and
> > > the protocol is expected to stop passing data to transport.write().
> >
> > If it's just a hint given to the protocol, perhaps it should be a
> > notification API rather than something giving an order? The API could
> > also be made flexible, e.g.:
> >
> > protocol.flow_control_event(kind, amount)
> >
> > where:
> > - kind = FlowControl.OUTPUT_BUFFER_FILLING or
> > FlowControl.OUTPUT_BUFFER_DRAINING
> > - amount = number of bytes currently waiting in buffer
> >
>
[...]
>
> I'm not super keen on having a single API with an enum indicating the
> direction, although I could imaging that it's easy to just ignore the
> 'kind' flag and just decide to pause or resume based on the amount.

Actually, I was thinking of it as a way to add other flow control
notifications in the future without adding more methods.

> Note that when drain() returns a Future, its caller should wait for that
> Future to complete before calling it again.

Wouldn't it be more user-friendly to return the same Future if called a
second time?

> A flow-contol-conscious app could work like this:
>
> t.write(blah_blah)
> yield from t.drain() or []
>
> The "or []" is a trick that turns the yield-from into a no-op when
> t.drain() returns None; this is less elegant but more efficient than
> returning a dummy Future. (Though much more efficient, since yield-from
> already takes a shortcut when the Future is completed on entry, and does
> not require a callback or an event loop cycle.)

Would it break too many things to remove the indirection through
loop.call_soon() in set_result() and set_exception()?
(I would also expect to see this discussed in the PEP, by the way :-))

> > > I'm sending this message mostly to get a feeling of how important
> > > pause_writing() is -- I may be missing a use case, or Telnet may be an
> > > important protocol to be able to support in this fashion. Flow
> > > control is often the last thing developers consider, nobody really
> > > wants to think about it, but sometimes it means the difference
> > > between a great and a merely adequate implementation of a protocol,
> > > so I want to make sure that it is available when needed. (For more
> > > background information, search for Jim Gettys' rant about big buffers
> > > killing performance.)
> >
> > I haven't ever needed it myself, but I've never implemented a protocol
> > streaming large amounts of data.
> >
>
> Well, what about an HTTP server that serves up large content (e.g. videos)?

Now that I think of it, I once wrote a RTMP server but we never
implemented any flow control. Ahem.

Regards

Antoine.


Glyph

unread,
Sep 30, 2013, 2:35:03 PM9/30/13
to Guido van Rossum, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
On Sep 30, 2013, at 10:27 AM, Guido van Rossum <gu...@python.org> wrote:

Still, a concern for this approach would be that the write_buffer_drained() call would require an event loop cycle to call it (we do this for all Protocol calls from the transport, and we have good reasons) and this would be a total waste if the Protocol is uninterested.

If the protocol is uninterested, that future event-loop tick is going to be ignored anyway, so I'm not seeing what resource is going to be wasted.  (Also, as I've learned, all _correct_ protocols are interested in this event, so you're only paying a penalty if )

Is that too big a price to pay for callback purity?

Am I correct in understanding that the cost for "purity" here is that the callback specified by the protocol is run in a different loop iteration but the callback specified by the future is run in the same one?  It seems like it's callbacks either way, and the allocation of the "cost" seems fairly arbitrary.

Also, a subsequent event-loop iteration can, in principle, be made as cheap as one call to call_soon, one call to next() and then a method call to dispatch the event.  If I'm understanding properly, the "expensive" thing would be to make a bunch of multiplexing syscalls, and it's up to the loop's discretion whether and how often it wants to call those.

Also it would require different Protocols to implement the same (somewhat subtle) logic. All in all I'm torn.

In that case, I think the solution is to simply implement the common flow-control logic in a separate class and make it easily accessible for any Protocol that wants to instantiate it.

Lots and lots of different logic has this problem; timeouts, error-handling, and authentication are all things that many Protocols may want to implement in the same way, and many of these things are very obviously outside the scope of the Transport's responsibility.  I'll grant that this case is in more of a grey area, but I think that it can be used to either set a good precent for other types of composition or to defer the problem further out.

-glyph

Geert Jansen

unread,
Sep 30, 2013, 5:14:42 PM9/30/13
to Guido van Rossum, Saúl Ibarra Corretgé, python-tulip
On Mon, Sep 30, 2013 at 7:27 PM, Guido van Rossum <gu...@python.org> wrote:


Interesting. I'm curious if you reached this API independently or somehow had read about Glyph's isomorphic proposal. I'm also interested in how hard in your experience proper bookkeeping on the protocol side is.

I didn't read Glyph's proposal. However a large part of the API wasn't done by me but comes from libuv, which I use verbatim for the transport layer. It provides a transport class (called Handle) with start_read(), stop_read(), and write() with a completion callback. I just took the pieces and implemented a few common protocols on top of it.

Regarding proper bookkeeping: it isn't hard. This is what I do currently on the write side:



I am a bit out of touch on recent Tulip developments, but I would prefer the Protocol.write_buffer_drained() callback over the Transport.drain() mechanism. I think the drain() method would be the first method on a Transport returning a Future?

This is true -- I hadn't thought of this as a big barrier, but maybe Antoine (who slightly prefers callbacks) would agree with you. (To me, Future is one add_done_callback() call away from a callback.)

Having a Transport "pure" callback style was a important for me. Gruvi doesn't have explicit futures. Instead, whenever in Tulip an method would return a Future, the Gruvi method would switch out. Users of the API need to know which methods can switch and which can't. I have annotations for that, but in addition I made the decision that transports would never switch out.


- Transport.get_buffer_size() reports the current buffer size.

- Protocol.write_buffer_drained(new_buffer_size) is called by the Transport whenever some bytes have been fully transferred from the write buffer to the kernel (or whatever is the next destination of the data).

Transport.get_buffer_size() is not strictly needed, because the protocol can keep the buffer size itself. Whenever it does a write(), it should increase the buffer size with the size of the buffer it just wrote, and whenever write_buffer_drained() is called it will update the buffer size to the new value.

Still, a concern for this approach would be that the write_buffer_drained() call would require an event loop cycle to call it (we do this for all Protocol calls from the transport, and we have good reasons) and this would be a total waste if the Protocol is uninterested. Is that too big a price to pay for callback purity?

I think it's probably not an issue. However if you are concerned, maybe you could check if write_buffer_drained() is implemented?
 
Also it would require different Protocols to implement the same (somewhat subtle) logic.

That is correct, but Protocols need to do a lot more common things anyway. In Gruvi I have a 350+ line base class for protocols that also does parsing, queueing of requests. I have implemented that common functionality in the Protocol base class.

Regards,
Geert

Guido van Rossum

unread,
Sep 30, 2013, 7:04:47 PM9/30/13
to Antoine Pitrou, python-tulip
Hm... Without specific ideas this gets my YAGNI hackles up.
 
> Note that when drain() returns a Future, its caller should wait for that
> Future to complete before calling it again.

Wouldn't it be more user-friendly to return the same Future if called a
second time?

Not really, if one of the caller cancels the Future, the other caller will be affected.

Perhaps just as important, write() on a stream protocol is essentially serial, it doesn't make much sense to have multiple tasks calling write() concurrently to the same transport; so there shouldn't be multiple tasks calling drain() either.

(Yes, I know you can come up with a use case for concurrent writes to a stream. But it's hard and unlikely to be very useful, and if you really need this it's easy enough to do some kind of multiplexing adapter.)
 
> A flow-contol-conscious app could work like this:
>
>     t.write(blah_blah)
>     yield from t.drain() or []
>
> The "or []" is a trick that turns the yield-from into a no-op when
> t.drain() returns None; this is less elegant but more efficient than
> returning a dummy Future. (Though much more efficient, since yield-from
> already takes a shortcut when the Future is completed on entry, and does
> not require a callback or an event loop cycle.)

Would it break too many things to remove the indirection through
loop.call_soon() in set_result() and set_exception()?

I'm really not keen on that. I worry that there would be too many ways where some global state could be changed unexpectedly. The goal is that all callbacks run strictly serialized, so each callback can assume no other world-changing callbacks will run until it returns.
 
(I would also expect to see this discussed in the PEP, by the way :-))

The PEP specifies that Future callbacks are called via call_soon. It sohuld be obvious that call_soon() serializes calls strictly. Do you want more?

Guido van Rossum

unread,
Sep 30, 2013, 7:10:15 PM9/30/13
to Glyph, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
On Mon, Sep 30, 2013 at 11:35 AM, Glyph <gl...@twistedmatrix.com> wrote:
On Sep 30, 2013, at 10:27 AM, Guido van Rossum <gu...@python.org> wrote:

Still, a concern for this approach would be that the write_buffer_drained() call would require an event loop cycle to call it (we do this for all Protocol calls from the transport, and we have good reasons) and this would be a total waste if the Protocol is uninterested.

If the protocol is uninterested, that future event-loop tick is going to be ignored anyway, so I'm not seeing what resource is going to be wasted.

Well, my idea was that if the app is uninterested, it doesn't call drain(), which means there's no Future, so it's just a check for None instead of an extra tick.

But I agree that this isn't a strong argument.

 

(Also, as I've learned, all _correct_ protocols are interested in this event, so you're only paying a penalty if )

Is that too big a price to pay for callback purity?

Am I correct in understanding that the cost for "purity" here is that the callback specified by the protocol is run in a different loop iteration but the callback specified by the future is run in the same one?  It seems like it's callbacks either way, and the allocation of the "cost" seems fairly arbitrary.

Also, a subsequent event-loop iteration can, in principle, be made as cheap as one call to call_soon, one call to next() and then a method call to dispatch the event.  If I'm understanding properly, the "expensive" thing would be to make a bunch of multiplexing syscalls, and it's up to the loop's discretion whether and how often it wants to call those.

Also it would require different Protocols to implement the same (somewhat subtle) logic. All in all I'm torn.

In that case, I think the solution is to simply implement the common flow-control logic in a separate class and make it easily accessible for any Protocol that wants to instantiate it.

Lots and lots of different logic has this problem; timeouts, error-handling, and authentication are all things that many Protocols may want to implement in the same way, and many of these things are very obviously outside the scope of the Transport's responsibility.  I'll grant that this case is in more of a grey area, but I think that it can be used to either set a good precent for other types of composition or to defer the problem further out.

This, and Geert's feedback, has made me rethink the design. I still think it's better to be able to ask the transport for the buffer size rather than have to do the bookkeeping in the protocol. I'll see if I can turn drain() into a helper function.

Guido van Rossum

unread,
Sep 30, 2013, 9:17:34 PM9/30/13
to Glyph, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
This, and Geert's feedback, has made me rethink the design. I still think it's better to be able to ask the transport for the buffer size rather than have to do the bookkeeping in the protocol. I'll see if I can turn drain() into a helper function.

I tried this, but I'm not happy with the results.

I tried two ways:

(1) A true helper function. Not only does this need to take both a transport and a protocol as arguments, it needs to mutate the protocol to point the write_buffer_drained callback to something that can make a Future done. This is just too ugly, so I looked at

(2) A mix-in Protocol class. This is a little less hacky, but it starts the slippery path down the road to using subclassing as an API, which I really don't like. (See e.g. code.google.com/p/tulip/issues/detail?id=19.)

So I am beginning to think that the purity argument is a red herring. So... apart from purity arguments, are there use cases supported by the Protocol.write_buffer_drained() API that the Transport.drain() API proposal doesn't?

Glyph

unread,
Oct 1, 2013, 1:34:34 AM10/1/13
to Guido van Rossum, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
What happens when the callback on Tulip.drain raises an exception?  If it does so, can the connection be automatically terminated in the event of such a failure?  Otherwise, a drain/do-stuff/drain/do-stuff loop (something that every correct protocol really needs to implement!) that an encounters an error between doing stuff and calling drain again will be broken and the transport may be left forever idle.  This kind of bug manifests as an edge-case turning into an infinite resource leak, so it's important to handle well in long-running servers.

Also: what happens if multiple parties call .drain concurrently?  Do all the Futures returned thereby fire at once once the write buffer is empty?  Does the first fire, and then the second fires after the next write()/buffer-empty pair of events happens?

I think that coming up with a third option here, one suitable for processing an arbitrary event, is important.  One faces the same problem with data_received, for example, when one wants to write general logic for timing out an idle connection.  Do you write a utility function that temporarily overrides data_received by setting an attribute?  Write a mixin?  Or something else?

Turning Tulip.drain into a thing that returns a Future only half-solves the problem for one case, by adding a bunch of extra complexity (a Future, the possibility of multiple observers) for one case.

I don't have a ready-made solution to this issue, sadly.  Twisted uses *way* too much subclassing in its API for utilities like this.  We've started transitioning to using wrapper Protocol objects, which is what I'd suggest for starters here, but that's admittedly still a little awkward and verbose.  I think there needs to be more exploration into different approaches... and such experimentation should be conducted within Protocol objects, so that the Transport object remains a simple deliverer of events via method calls to its Protocol object.

-glyph

Antoine Pitrou

unread,
Oct 1, 2013, 4:30:52 AM10/1/13
to python...@googlegroups.com
Le Mon, 30 Sep 2013 16:04:47 -0700,
Guido van Rossum <gu...@python.org> a
écrit :
>
> > (I would also expect to see this discussed in the PEP, by the
> > way :-))
> >
>
> The PEP specifies that Future callbacks are called via call_soon. It
> sohuld be obvious that call_soon() serializes calls strictly. Do you
> want more?

Well, I think it would be nice to add the explanation, even in a very
terse manner.

Regards

Antoine.


Guido van Rossum

unread,
Oct 1, 2013, 11:38:20 AM10/1/13
to Antoine Pitrou, python-tulip
On Tue, Oct 1, 2013 at 1:30 AM, Antoine Pitrou <soli...@pitrou.net> wrote:
Le Mon, 30 Sep 2013 16:04:47 -0700, Guido van Rossum <gu...@python.org> a écrit :
> > (I would also expect to see this discussed in the PEP, by the
> > way :-))

> The PEP specifies that Future callbacks are called via call_soon. It
> should be obvious that call_soon() serializes calls strictly. Do you
> want more?

Well, I think it would be nice to add the explanation, even in a very
terse manner.

Done.

Guido van Rossum

unread,
Oct 1, 2013, 1:00:35 PM10/1/13
to Glyph, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
On Mon, Sep 30, 2013 at 10:34 PM, Glyph <gl...@twistedmatrix.com> wrote:

On Sep 30, 2013, at 6:17 PM, Guido van Rossum <gu...@python.org> wrote:
This, and Geert's feedback, has made me rethink the design. I still think it's better to be able to ask the transport for the buffer size rather than have to do the bookkeeping in the protocol. I'll see if I can turn drain() into a helper function.

I tried this, but I'm not happy with the results.

I tried two ways:

(1) A true helper function. Not only does this need to take both a transport and a protocol as arguments, it needs to mutate the protocol to point the write_buffer_drained callback to something that can make a Future done. This is just too ugly, so I looked at

(2) A mix-in Protocol class. This is a little less hacky, but it starts the slippery path down the road to using subclassing as an API, which I really don't like. (See e.g. code.google.com/p/tulip/issues/detail?id=19.)

So I am beginning to think that the purity argument is a red herring. So... apart from purity arguments, are there use cases supported by the Protocol.write_buffer_drained() API that the Transport.drain() API proposal doesn't?

What happens when the callback on Tulip.drain raises an exception?

The exception is logged and life goes on. This is the same for all callbacks.
 
If it does so, can the connection be automatically terminated in the event of such a failure?

In Tulip's world that would not be sufficient anyway. Assuming that the effect of a callback is to wake up a coroutine, the callback will always succeed. The callback (Task._wakeup()) calls Task._step() which passes control into the coroutine/generator using next() or send().Iif the generator raises an exception, the standard Python try/except machinery is invoked, and if the exception is not handled locally it will bubble up through yield-from to the generator waiting for it, and so on. If it isn't caught it eventually reaches a try/except in Task._step() which calls self.set_exception() and returns. The set_exception() call will make another coroutine runnable, which will happen on the next event loop cycle.
 
Otherwise, a drain/do-stuff/drain/do-stuff loop (something that every correct protocol really needs to implement!) that an encounters an error between doing stuff and calling drain again will be broken and the transport may be left forever idle.  This kind of bug manifests as an edge-case turning into an infinite resource leak, so it's important to handle well in long-running servers.

Maybe, but I don't see how it's different from failures in the other Protocol callbacks. For example, when using a StreamReader, the StreamReaderProtocol.data_received() callback just copies the data into a buffer and makes a Future complete -- then data_received() returns and it a tick later the coroutine waiting for the Future wakes up.
 
Also: what happens if multiple parties call .drain concurrently?  Do all the Futures returned thereby fire at once once the write buffer is empty?  Does the first fire, and then the second fires after the next write()/buffer-empty pair of events happens?

There shouldn't be multiple parties. Multiple parties writing to a stream transport is about as useful as multiple threads writing to a traditional file -- while I'm sure you can construct a use case, it's rarely a good idea and I don't see why we should try to support it -- if you really need this you should write some kind of adapter. (The only reasonable use case I can think of would be logging, but it seems a bad idea to block a task when the log buffer fills up. Better throw away some log lines than risk timeout-induced failures in the primary functionality.)
 
I think that coming up with a third option here, one suitable for processing an arbitrary event, is important.  One faces the same problem with data_received, for example, when one wants to write general logic for timing out an idle connection.  Do you write a utility function that temporarily overrides data_received by setting an attribute?  Write a mixin?  Or something else?

Wouldn't it be sufficient to have the timeout callback forcibly close the transport, e.g. using abort()?
 
Turning Tulip.drain into a thing that returns a Future only half-solves the problem for one case,

What does it half-solve? It seems to be pretty capable of solving the flow control issue.
 

by adding a bunch of extra complexity (a Future, the possibility of multiple observers) for one case.

I don't have a ready-made solution to this issue, sadly.  Twisted uses *way* too much subclassing in its API for utilities like this.  We've started transitioning to using wrapper Protocol objects, which is what I'd suggest for starters here, but that's admittedly still a little awkward and verbose.  I think there needs to be more exploration into different approaches... and such experimentation should be conducted within Protocol objects, so that the Transport object remains a simple deliverer of events via method calls to its Protocol object.

 Well, it's clear that the Transport must deliver events -- it's less clear to me that the Protocol is really the right target of the resume event (apart from the purity argument). Here's what I found during my attempt to implement this several ways.

In the select/socket based transports (classes _SelectorSocketTransport and _SelectorSslTransport) there are four points where the write buffer shrinks -- two in the plain socket class, both  in the _write_ready callback (which is invoked when the FD fires -- there are two different paths depending on how much data the socket accepts) and two in the similar callback in the SSL class. No matter how you shape the interface, you have to add something to each of these points, and that can be an internal method on a shared base class (all these classes are internal).

Now the effect you eventually want to get is that the task that's waiting to write more data to the transport needs to be woken up. And the way to wake up a task is to set the result of the Future on which it is blocked. So one way to think about this is, who manages that Future? Basically, the choices are the transport, the protocol, the task, or a 4th party created just to manage it.

In my Transport.drain() proposal the transport owns the Future. That's the simplest way to code it, because it's also the transport that must set the Future's result. The app calls transport.drain(), gets a Future, waits for it, and it's done.

In your Protocol.write_buffer_drained() proposal the Protocol owns the Future. Now the app has to call protocol.drain() to get the future. But now the protocol needs to talk to the transport -- it needs to know the current buffer size. It can either call Transport.get_buffer_wise(), if we add this, or it can try to keep track of the buffer size by tracking all writes -- but then the protocol needs to be involved in all write() calls, wheras currently the design is such that an app can write directly to the transport without referencing the protocol. (This may sound preposterous from Twisted's POV, where "the app" and "the protocol" are pretty much synonymous, but in Tulip they aren't -- the app is a task, and the protocol is one step removed from the task, with e.g. a StreamReader sitting in between. Check out streams.py: http://code.google.com/p/tulip/source/browse/tulip/streams.py.)

If we make the app responsible for managing the Future, it has to pass this Future to the transport so the latter can set its result when the buffer is drained. Or perhaps it might be better to pass the transport a callback that it should call -- this avoids building knowledge about Future into the transport API. I suppose I could live with this -- I'd immediately write a helper function for the app to call with the transport and the desirable high- and low-water marks, and it would tell the transport what callback to call when, and return the Future. (I guess that's the 4th party solution.)

So why am I so keen to keep the protocol out of this? I think it's because in common use the app is a task and it deals directly with the transport for write operations -- and flow control is tied to writing, which is why involving the protocol (for purity) seems an unnecessary detour to me. In the end, a simple app can look like this:

@coroutine
def fetch():
    r, w = yield from open_connection('python.org', 80)
    # r is a StreamReader (not a Protocol!); w is a Transport
    w.write(b'GET / HTTP/1.0\r\n\r\n')
    while True:  # read until blank line
        line = yield from r.readline()
        if not line.strip():
            break
    contents = yield from r.read()
    return contents

To add flow control to this (e.g. when pumping a large body into a POST request) it should be sufficient to write something like this:

    with open('body.txt', 'rb') as f:
        while True:
            data = f.read(64*1024)
            if not data:
                break
            yield from w.drain()  # or perhaps drain(w)
            w.write(data)

I like that this goes directly to the protocol, so that if instead of StreamReader some other input mechanism is used, the drain() call is still the same.

Glyph

unread,
Oct 3, 2013, 4:31:13 AM10/3/13
to Guido van Rossum, Geert Jansen, Saúl Ibarra Corretgé, python-tulip

On Oct 1, 2013, at 10:00 AM, Guido van Rossum <gu...@python.org> wrote:

You'll be happy to know that I deleted about 10,000 words of pontification which ended up being useless, after converging on this simple recommendation :-).

If we make the app responsible for managing the Future, it has to pass this Future to the transport so the latter can set its result when the buffer is drained. Or perhaps it might be better to pass the transport a callback that it should call -- this avoids building knowledge about Future into the transport API. I suppose I could live with this -- I'd immediately write a helper function for the app to call with the transport and the desirable high- and low-water marks, and it would tell the transport what callback to call when, and return the Future. (I guess that's the 4th party solution.)

So why am I so keen to keep the protocol out of this? I think it's because in common use the app is a task and it deals directly with the transport for write operations -- and flow control is tied to writing, which is why involving the protocol (for purity) seems an unnecessary detour to me. In the end, a simple app can look like this:

@coroutine
def fetch():
    r, w = yield from open_connection('python.org', 80)
    # r is a StreamReader (not a Protocol!); w is a Transport

And here we have it:

'w' has to be *something*, but why a Transport?

You've already got a nice, decoupled not-a-transport, not-a-protocol object for reading from a stream.

That object already needs to manage all the reading events (data_received) for the stream, via a dedicated Protocol that is not exposed to the application, to convert them into read-flow-control notifications (i.e.: returning bytes).

Yet, 'w' exposes the underlying implementation all the way down to the transport!  Why? ;-)

You could easily return a StreamWriter object that had a write() method that did all the necessary bookkeeping - bookkeeping necessary only for applications written using this interface, since things written against the lower event-driven layer won't want to have a Task for writing at all; they'll want to have some Twisted or Tornado abstraction! :)

So, open_connection becomes something like this:

    if loop is None:
        loop = events.get_event_loop()
    reader = StreamReader(limit=limit, loop=loop)
    writer = StreamWriter(loop=loop)
    protocol = StreamReaderProtocol(reader, writer)
    transport, _ = yield from loop.create_connection(
        lambda: protocol, host, port, **kwds)
    # note: no need for comment excusing the fact that this isn't (reader, writer) now!
    return reader, writer

That would allow for writer.write to return a Future _as well_ as writer.drain - or perhaps, now that it's on a nice, dedicated, file-like object, we could call it writer.flush() for symmetry :).

Of course, for access to lower-level information about the transport (IP address, etc) StreamWriter could still expose a 'transport' attribute, but most applications would not need to use it.

    w.write(b'GET / HTTP/1.0\r\n\r\n')
    while True:  # read until blank line
        line = yield from r.readline()
        if not line.strip():
            break
    contents = yield from r.read()
    return contents

To add flow control to this (e.g. when pumping a large body into a POST request) it should be sufficient to write something like this:

    with open('body.txt', 'rb') as f:
        while True:
            data = f.read(64*1024)
            if not data:
                break
            yield from w.drain()  # or perhaps drain(w)
            w.write(data)

I like that this goes directly to the protocol, so that if instead of StreamReader some other input mechanism is used, the drain() call is still the same.

... I think you mean "directly to the transport"?

Personally, I don't think that has a lot to recommend it.  If you're writing an application with a custom Protocol implementation, you can construct your own StreamWriter directly and *very* easily deliver the events it needs, managing its access to your underlying transport more easily than you could manage passing your transport directly to some random function that expects to be able to receive events from it.

-glyph

Guido van Rossum

unread,
Oct 3, 2013, 11:15:58 PM10/3/13
to Glyph, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
So, suppose I do that. Do you still want the protocol (or the StreamWriter) to keep track of the transport's buffer size? I came up with a bit of a conundrum there.

Some of Tulip's transports have a feature whereby if the socket accepts the data it never gets buffered[1]. If you write less data than the kernel will buffer (typically 64 or 128 K I believe) or slow enough that the network absorbs it, the transport may never have any data in its buffer. But the StreamWriter doesn't know about this, so its calculation of how much data is buffered doesn't take this into account. But now this means that write_buffer_drained() may *never* be called! Unless we also call it for this special case. That still seems simple enough -- we can just call it also whenever a write() takes this shortcut.

But can we? In the non-shortcut case, we will make at most one send() call per tick (== event loop cycle), so there will be at most one such callback per tick. But if we take the shortcut, if the app calls write() four times within the same tick, write_buffer_drained() would be scheduled four times. (It doesn't get called immediately, because of the context rule -- protocol methods are called on the next tick.) That seems wrong, but it seems nearly unavoidable if we want to do the bookkeeping in the protocol or in the writer. Collapsing these into a single call is possible but more effort.

So all in all it still feels better to me if the transport simply has a method that will tell you the buffer size at any time. Then Protocol.write_buffer_drained() doesn't have to be called when the transport takes the shortcut (the buffer will then always be empty) and it doesn't need to be passed a new_buffer_size argument either -- it can just ask for the number when it needs it (it might not, if there's nobody waiting for the buffer to drain).

[1] http://code.google.com/p/tulip/source/browse/tulip/selector_events.py#431

Glyph

unread,
Oct 4, 2013, 3:32:35 AM10/4/13
to Guido van Rossum, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
On Oct 3, 2013, at 8:15 PM, Guido van Rossum <gu...@python.org> wrote:

So, suppose I do that. Do you still want the protocol (or the StreamWriter) to keep track of the transport's buffer size? I came up with a bit of a conundrum there.

Well... sort of.

Let me expand a bit on that idea.  It's not so much that I don't want there to be a way to interrogate the transport to get buffering characteristics.  That can be a useful API for monitoring and debugging and statistical profiling.  It's that I only want there to be one way to figure out when it's a good idea to stop writing, and only one way to figure out when it's a good idea to start writing again.

Which means, there should be a callback that tells you the Protocol to pause its writing and a callback that tells the Protocol to resume its writing.

The rest of this message is somewhat redundant, and is just some arguments in favor of that simple idea, and some background on how I thought I'd originally tried to communicate that same idea, and subsequently lost track of it in the discussion of implementation specifics :-).  So anyone who immediately agrees with this concept can just stop reading here.

Moving on...

Some of Tulip's transports have a feature whereby if the socket accepts the data it never gets buffered[1]. If you write less data than the kernel will buffer (typically 64 or 128 K I believe) or slow enough that the network absorbs it, the transport may never have any data in its buffer. But the StreamWriter doesn't know about this, so its calculation of how much data is buffered doesn't take this into account. But now this means that write_buffer_drained() may *never* be called! Unless we also call it for this special case. That still seems simple enough -- we can just call it also whenever a write() takes this shortcut.

Given these semantics for write_buffer_drained, it would be bizarrely asymmetric not to call it in this case; write() adds to the buffer, write_buffer_drained() describes a change to the buffer.  I am having a hard time imagining how the contract would be documented for that callback if so... "called when the outgoing write buffer shrinks, unless it shrank, like, super fast, in which case it won't be called, and you should check right after calling write() to see if it's going to be called by checking transport.outgoing_buffer_size() because if that value is zero then it won't be called until you call write again".

But... maybe these are the wrong semantics.  I've said that Twisted's model has issues, and it does, so let me trim it down to just "the good parts" here (in other words, the following is not actually what Twisted does, but a simpler version that does all the parts it needs to do).

Twisted implements the same shortcut, but avoids the problem of having to expose the presence of said shortcut to application code by having two flow-control callbacks on the protocol: one for "the write buffer filled up enough that you should stop writing", and one for "the write buffer emptied enough that you should begin writing again".

These are really the callbacks that you want - and by "you", I mean "StreamWriter".  How full and empty these buffers should be before applications should consider the buffer "full" is an important tunable, which is why OS kernels expose it as SO_SNDBUF.  You generally want applications to keep writing when there's a little – but not too much – data buffered.

In low data volume operation, neither the pause nor the resume callback gets invoked.  If the shortcut succeeds and immediately writes its data, then great, no need to pause the protocol at all.  Which I think means that StreamWriter can return () instead of a Future from its drain() or write() method, possibly also optimizing at that level?

An explicit "pause" callback also gives other transports which can't implement the "write immediately" optimization a little wiggle room, so they don't need to be constantly telling the protocol about small buffers that empty out as asynchronous I/O operations are dispatched and complete; they can set a high water mark for application buffers where they decide that their own idea of a buffer is full.

The reason I originally suggested a write_buffer_drained-style callback as a tweak on this pause_producing/resume_producing scheme was not necessarily to say that you shouldn't have the pause_producing callback, but that it might be nice to give the protocol (ahem, application) access to the same information that the transport has access to.  This would allow applications which need to prepare their writes - for example, by first kicking off some high-latency operation like a new TCP connection before they'll have any data ready - to write predictive algorithms that would choose a good time to get started, in advance of the app-level write buffer actually being completely empty.  I didn't anticipate the implementation difficulties of exposing this information; I figured, it's just an integer, how hard could it be? :)

Of course, a really smart application would want visibility even deeper into the stack, but most operating systems don't even let you get access to that information.  As far as I know, SO_NWRITE is available only on BSD-esque platforms, not Linux or Windows.  (And even there, you can't tune anything about the writable notification: if a socket has any buffer space, it's writable; you don't get to find out when it's more writable.)

And then there are maaagical routers and managed switches out there on the network that will do "good" (read: good for performance of downloads, bad for everything else) things like buffer your TCP segments and reassemble them and not even tell you that they did so, and there's definitely *no* way to get visibility into that part of the stack.  (I think Guido already mentioned this, but you can read <https://queue.acm.org/detail.cfm?id=2071893> if want to get depressed about this.)

So perhaps this would all benefit from simplifying and re-focusing on the thing that StreamWriter actually wants, which is instructions on when to pause writing and when to resume writing again, and entirely forget about exposing user-space buffer sizes to application code.  write_buffer_drained_some_but_not_quite_enough_to_start_writing_again_yet could be an additional callback added to a future version if somebody actually needs it.  (And nobody has actually needed it, for Twisted.)

-glyph

Appendix A:

You might find this program interesting if you're on a BSD-esque platform, as it will show you progress on uploading a couple of packets to a server:

import cffi
ffi = cffi.FFI()
ffi.cdef("""
#define SO_NREAD ...
#define SO_NWRITE ...
""")
lib = ffi.verify("""
#include <sys/socket.h>
""")
import socket
import time
skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
skt.connect(("example.com", 80))
skt.setblocking(False)
print(skt.send(b'x' * 1024 * 20))
def moredata():
    remaining = skt.getsockopt(socket.SOL_SOCKET, lib.SO_NWRITE)
    print remaining
    return remaining
while moredata():
    time.sleep(0.05)

Guido van Rossum

unread,
Oct 11, 2013, 7:00:10 PM10/11/13
to Glyph, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
That was a really cool demo program!

This thread isn't dead yet. I've now implemented your proposal (being the least bad compromise). The new APIs are not yet in the repo, but here they are: https://codereview.appspot.com/14613043

TL;DR:

Transport
- set_buffer_limits(high, low)
- get_buffer_size()

Protocol
- pause_writing()
- resume_writing()

StreamWriter (new class)
- drain() (new method)
- transport (new property)
- write(), writelines(), write_eof(), can_write_eof(), get_extra_info(), close() (pass through)

I discovered a bunch of missed end cases in the handling of closed connections that I fixed (already in the repo, not in that code review).

Also, I chose to make write() raise an exception when the buffer fills up *really* high (10x the limit), which would suggest that you're ignoring flow control. And drain() will raise when the connection is closed.

Thoughts?

Glyph

unread,
Oct 12, 2013, 7:59:13 PM10/12/13
to Guido van Rossum, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
On Oct 11, 2013, at 4:00 PM, Guido van Rossum <gu...@python.org> wrote:

That was a really cool demo program!

Glad you enjoyed it :-).


This thread isn't dead yet. I've now implemented your proposal (being the least bad compromise). The new APIs are not yet in the repo, but here they are: https://codereview.appspot.com/14613043

TL;DR:

Transport
- set_buffer_limits(high, low)
- get_buffer_size()

The one issue I see with the structure here is that the word 'buffer' here is unadorned.  There are multiple buffers here, and you can't necessarily inspect them all.  For example, are you setting SO_SNDBUF, SO_SNDLOWAT, or a user-space buffer?

Protocol
- pause_writing()
- resume_writing()

StreamWriter (new class)
- drain() (new method)
- transport (new property)
- write(), writelines(), write_eof(), can_write_eof(), get_extra_info(), close() (pass through)

I discovered a bunch of missed end cases in the handling of closed connections that I fixed (already in the repo, not in that code review).

Cool.  Still discovering those in Twisted myself ;-).

Also, I chose to make write() raise an exception when the buffer fills up *really* high (10x the limit), which would suggest that you're ignoring flow control. And drain() will raise when the connection is closed.

Thoughts?

Making write() raise is a tricky proposition for correctness.  Should applications start handling those errors?  Is there any reasonable way to handle them?  If not, how should they deal with large strings that they've already managed to buffer up in memory?

I would very much like it if there were a reasonable way to enforce application of back-pressure, but it seems like this could easily be something that looks like it works okay when testing with small values, then blows up horribly when run with "real" data that is much larger, rather than just consuming more resources as expected in production.

As far as drain() raising... do you just mean if the socket is *already* closed when you first call drain() or if it closes while waiting for the output to drain?  I guess either way is fine, actually.

-glyph

Guido van Rossum

unread,
Oct 18, 2013, 7:44:22 PM10/18/13
to Glyph, Geert Jansen, Saúl Ibarra Corretgé, python-tulip
To conclude this thread, write flow control along these lines has landed (in both Python 3.4 alpha 4 and Tulip).

The API uses Protocol methods pause_writing() and resume_writing(), which are called by the Transport when its buffer fills up above a certain limit, and when it drains below another limit -- the high- and low-water marks, which are usually different and both greater than zero (but you can change them, on a per-Transport basis).

Since Glyph frowned about write() raising, I dropped that idea -- if the Protocol keeps writing, the Transport's buffer will just keep growing.

Instead, the new StreamWriter class (which wraps Transport) has a drain() method that raises if the reader has an exception or if the connection is lost. I *think* this is the correct termination condition; when using StreamReader/Writer the app doesn't have a connection_lost callback.

I renamed the Transport methods set_buffer_limits() and get_buffer_size() to set_write_buffer_limits() and get_write_buffer_size() per Glyphs recommendation.

I also renamed read flow control to pause_reading() and resume_reading() as mentioned earlier. As a reminder, these are Transport methods that the Protocol can call if it can't handle (much) more data with data_received(). The StreamReader class now uses these.

Reply all
Reply to author
Forward
0 new messages