Maybe we could kill them and provide some means for protocol writers to implement them at the protocol level. Here is a (rough) idea that might work: the transport could expose the number of queued bytes, so a protocol could check how big the queue is before calling write(), and write() could take a callback (or maybe call a function in the protocol) when writing has been completed. That way the protocol could control when to (or not to) call write.
Le Sun, 29 Sep 2013 09:48:50 -0700, Guido van Rossum <gu...@python.org> a écrit:
> But for most other TCP-based protocols, flow control is implicit atIf it's just a hint given to the protocol, perhaps it should be a
> the TCP layer -- the way this information bubbles up to the writer is
> simply by the socket not accepting new non-blocking send() calls,
> which results in data being buffered in the transport. When the
> transport's buffer fills up over a certain high-water mark, the
> transport will call protocol.pause_writing() (once implemented) and
> the protocol is expected to stop passing data to transport.write().
notification API rather than something giving an order? The API could
also be made flexible, e.g.:
protocol.flow_control_event(kind, amount)
where:
- kind = FlowControl.OUTPUT_BUFFER_FILLING or
FlowControl.OUTPUT_BUFFER_DRAINING
- amount = number of bytes currently waiting in buffer
> (There are more layers here to consider -- e.g. if theHow is the reader related to output flow control?
> app uses a StreamReader, that reader may have to implement pausing
> somehow.)
Incoming bytes have to be buffered no matter how. Whether they are
buffered in the lower or upper layers of the software stack, does it
make a difference?
> I'm sending this message mostly to get a feeling of how importantI haven't ever needed it myself, but I've never implemented a protocol
> pause_writing() is -- I may be missing a use case, or Telnet may be an
> important protocol to be able to support in this fashion. Flow
> control is often the last thing developers consider, nobody really
> wants to think about it, but sometimes it means the difference
> between a great and a merely adequate implementation of a protocol,
> so I want to make sure that it is available when needed. (For more
> background information, search for Jim Gettys' rant about big buffers
> killing performance.)
streaming large amounts of data.
On Mon, Sep 30, 2013 at 9:29 AM, Saúl Ibarra Corretgé <sag...@gmail.com> wrote:Maybe we could kill them and provide some means for protocol writers to implement them at the protocol level. Here is a (rough) idea that might work: the transport could expose the number of queued bytes, so a protocol could check how big the queue is before calling write(), and write() could take a callback (or maybe call a function in the protocol) when writing has been completed. That way the protocol could control when to (or not to) call write.+1 When implementing (read or write) flow control in a protocol, an important input is the number of bytes outstanding in the transport
(in addition there are protocols specific details about e.g. chunk sizes can be taken into account. For example, in HTTP it doesn't make sense to pause sending 1 byte before you send a complete chunk. This how write flow control works in gruvi (gruvi.readthedocs.org))
Regarding pause_writing() / resume_writing(): That actually reminded me of TCP_CORK, which is flow control in a sense but I'm not sure if the API was create to cover this.
In fact, Transport.write() could already return [] instead of None in case there is buffer space to write more, so that the code can be reduced to:yield from t.write(blah_blah)This is similar to socket or file write, and therefore it will be easier to learn. When designing an API, it is always useful to have it similar to an existing familiar API. The same reason we use familiar terminology, or the same design patterns. Making an API easy to learn matters a lot IMHO.
In that case, why not:
yield from t.write(blah_blah) or []
Which is exactly what I proposed a couple of months ago, minus the "or
[]" trick. The only reason I didn't insist on this more was because of
your claims that futures are expensive, and I didn't think of this
trick. Since making tulip slower is not my intention, I gave up on that
idea.
Hum, you mean effectively draining the buffer after each write? That's probably not what everyone wants. Data can be buffered just fine, until a limit is reached and then the buffer can be drained.
(in addition there are protocols specific details about e.g. chunk sizes can be taken into account. For example, in HTTP it doesn't make sense to pause sending 1 byte before you send a complete chunk. This how write flow control works in gruvi (gruvi.readthedocs.org))Can't quite follow what you're saying here, and the gruvi docs are too big to find the docs for its flow control API. Can you provide more details?
How about making write() return a boolean? If it returns False then the caller is advised to call drain because he reached the max buffer size (which can be set in __init__).
if not t.write('foo'):
# ouch, buffer is full, lets wait
yield from t.drain()
Inside drain() we'd assert that len(self._buffer) >= limit.
On Mon, Sep 30, 2013 at 6:28 PM, Guido van Rossum <gu...@python.org> wrote:(in addition there are protocols specific details about e.g. chunk sizes can be taken into account. For example, in HTTP it doesn't make sense to pause sending 1 byte before you send a complete chunk. This how write flow control works in gruvi (gruvi.readthedocs.org))Can't quite follow what you're saying here, and the gruvi docs are too big to find the docs for its flow control API. Can you provide more details?
What I meant is that the ultimate decision whether to pause sending should be with the protocol, not the transport. My comment wan't really related to pause_writing() / resume_writing(), sorry. (It was related to a failed attempt of myself to do flow control in the transport by pausing the co-routine running the protocol. It was a bad approach....)
Gruvi's transport's have exactly the same flow API as describe here (it's essentially the libuv flow control because Gruvi's transports are pyuv Handle's):
- Transports have a pause() / resume() method that a protocol can use to implement flow control in the read direction. (They are called start_read() / stop_read() but they do the same thing).- A transport allows a protocol to deduce how many outstanding bytes there are, so that it can implement flow control in the write direction. This is actually implemented as a callback that fires when a write() operation on the transport completes. So in order to know the outstanding bytes, a protocol needs to register this callback and do the bookkeeping. A Protocol.write_buffer_drained() would basically be the same thing.
I am a bit out of touch on recent Tulip developments, but I would prefer the Protocol.write_buffer_drained() callback over the Transport.drain() mechanism. I think the drain() method would be the first method on a Transport returning a Future?
Also it would be a bit asymmetrical because in a protocol you *do* keep track of the size of your read buffer, for read flow control. It is maybe more consistent if you can implement your write flow control in the same way too.
Still, a concern for this approach would be that the write_buffer_drained() call would require an event loop cycle to call it (we do this for all Protocol calls from the transport, and we have good reasons) and this would be a total waste if the Protocol is uninterested.
Is that too big a price to pay for callback purity?
Also it would require different Protocols to implement the same (somewhat subtle) logic. All in all I'm torn.
Interesting. I'm curious if you reached this API independently or somehow had read about Glyph's isomorphic proposal. I'm also interested in how hard in your experience proper bookkeeping on the protocol side is.
I am a bit out of touch on recent Tulip developments, but I would prefer the Protocol.write_buffer_drained() callback over the Transport.drain() mechanism. I think the drain() method would be the first method on a Transport returning a Future?This is true -- I hadn't thought of this as a big barrier, but maybe Antoine (who slightly prefers callbacks) would agree with you. (To me, Future is one add_done_callback() call away from a callback.)
- Transport.get_buffer_size() reports the current buffer size.- Protocol.write_buffer_drained(new_buffer_size) is called by the Transport whenever some bytes have been fully transferred from the write buffer to the kernel (or whatever is the next destination of the data).
Still, a concern for this approach would be that the write_buffer_drained() call would require an event loop cycle to call it (we do this for all Protocol calls from the transport, and we have good reasons) and this would be a total waste if the Protocol is uninterested. Is that too big a price to pay for callback purity?
Also it would require different Protocols to implement the same (somewhat subtle) logic.
> Note that when drain() returns a Future, its caller should wait for thatWouldn't it be more user-friendly to return the same Future if called a
> Future to complete before calling it again.
second time?
> A flow-contol-conscious app could work like this:Would it break too many things to remove the indirection through
>
> t.write(blah_blah)
> yield from t.drain() or []
>
> The "or []" is a trick that turns the yield-from into a no-op when
> t.drain() returns None; this is less elegant but more efficient than
> returning a dummy Future. (Though much more efficient, since yield-from
> already takes a shortcut when the Future is completed on entry, and does
> not require a callback or an event loop cycle.)
loop.call_soon() in set_result() and set_exception()?
(I would also expect to see this discussed in the PEP, by the way :-))
On Sep 30, 2013, at 10:27 AM, Guido van Rossum <gu...@python.org> wrote:
Still, a concern for this approach would be that the write_buffer_drained() call would require an event loop cycle to call it (we do this for all Protocol calls from the transport, and we have good reasons) and this would be a total waste if the Protocol is uninterested.If the protocol is uninterested, that future event-loop tick is going to be ignored anyway, so I'm not seeing what resource is going to be wasted.
(Also, as I've learned, all _correct_ protocols are interested in this event, so you're only paying a penalty if )Is that too big a price to pay for callback purity?Am I correct in understanding that the cost for "purity" here is that the callback specified by the protocol is run in a different loop iteration but the callback specified by the future is run in the same one? It seems like it's callbacks either way, and the allocation of the "cost" seems fairly arbitrary.Also, a subsequent event-loop iteration can, in principle, be made as cheap as one call to call_soon, one call to next() and then a method call to dispatch the event. If I'm understanding properly, the "expensive" thing would be to make a bunch of multiplexing syscalls, and it's up to the loop's discretion whether and how often it wants to call those.Also it would require different Protocols to implement the same (somewhat subtle) logic. All in all I'm torn.In that case, I think the solution is to simply implement the common flow-control logic in a separate class and make it easily accessible for any Protocol that wants to instantiate it.Lots and lots of different logic has this problem; timeouts, error-handling, and authentication are all things that many Protocols may want to implement in the same way, and many of these things are very obviously outside the scope of the Transport's responsibility. I'll grant that this case is in more of a grey area, but I think that it can be used to either set a good precent for other types of composition or to defer the problem further out.
This, and Geert's feedback, has made me rethink the design. I still think it's better to be able to ask the transport for the buffer size rather than have to do the bookkeeping in the protocol. I'll see if I can turn drain() into a helper function.
Le Mon, 30 Sep 2013 16:04:47 -0700, Guido van Rossum <gu...@python.org> a écrit :
> > (I would also expect to see this discussed in the PEP, by the
> > way :-))
> The PEP specifies that Future callbacks are called via call_soon. It
> should be obvious that call_soon() serializes calls strictly. Do you
> want more?
Well, I think it would be nice to add the explanation, even in a very
terse manner.
On Sep 30, 2013, at 6:17 PM, Guido van Rossum <gu...@python.org> wrote:This, and Geert's feedback, has made me rethink the design. I still think it's better to be able to ask the transport for the buffer size rather than have to do the bookkeeping in the protocol. I'll see if I can turn drain() into a helper function.
I tried this, but I'm not happy with the results.I tried two ways:(1) A true helper function. Not only does this need to take both a transport and a protocol as arguments, it needs to mutate the protocol to point the write_buffer_drained callback to something that can make a Future done. This is just too ugly, so I looked at
(2) A mix-in Protocol class. This is a little less hacky, but it starts the slippery path down the road to using subclassing as an API, which I really don't like. (See e.g. code.google.com/p/tulip/issues/detail?id=19.)
So I am beginning to think that the purity argument is a red herring. So... apart from purity arguments, are there use cases supported by the Protocol.write_buffer_drained() API that the Transport.drain() API proposal doesn't?
What happens when the callback on Tulip.drain raises an exception?
If it does so, can the connection be automatically terminated in the event of such a failure?
Otherwise, a drain/do-stuff/drain/do-stuff loop (something that every correct protocol really needs to implement!) that an encounters an error between doing stuff and calling drain again will be broken and the transport may be left forever idle. This kind of bug manifests as an edge-case turning into an infinite resource leak, so it's important to handle well in long-running servers.
Also: what happens if multiple parties call .drain concurrently? Do all the Futures returned thereby fire at once once the write buffer is empty? Does the first fire, and then the second fires after the next write()/buffer-empty pair of events happens?
I think that coming up with a third option here, one suitable for processing an arbitrary event, is important. One faces the same problem with data_received, for example, when one wants to write general logic for timing out an idle connection. Do you write a utility function that temporarily overrides data_received by setting an attribute? Write a mixin? Or something else?
Turning Tulip.drain into a thing that returns a Future only half-solves the problem for one case,
by adding a bunch of extra complexity (a Future, the possibility of multiple observers) for one case.I don't have a ready-made solution to this issue, sadly. Twisted uses *way* too much subclassing in its API for utilities like this. We've started transitioning to using wrapper Protocol objects, which is what I'd suggest for starters here, but that's admittedly still a little awkward and verbose. I think there needs to be more exploration into different approaches... and such experimentation should be conducted within Protocol objects, so that the Transport object remains a simple deliverer of events via method calls to its Protocol object.
If we make the app responsible for managing the Future, it has to pass this Future to the transport so the latter can set its result when the buffer is drained. Or perhaps it might be better to pass the transport a callback that it should call -- this avoids building knowledge about Future into the transport API. I suppose I could live with this -- I'd immediately write a helper function for the app to call with the transport and the desirable high- and low-water marks, and it would tell the transport what callback to call when, and return the Future. (I guess that's the 4th party solution.)
So why am I so keen to keep the protocol out of this? I think it's because in common use the app is a task and it deals directly with the transport for write operations -- and flow control is tied to writing, which is why involving the protocol (for purity) seems an unnecessary detour to me. In the end, a simple app can look like this:
@coroutinedef fetch():r, w = yield from open_connection('python.org', 80)# r is a StreamReader (not a Protocol!); w is a Transport
w.write(b'GET / HTTP/1.0\r\n\r\n')while True: # read until blank lineline = yield from r.readline()if not line.strip():breakcontents = yield from r.read()return contentsTo add flow control to this (e.g. when pumping a large body into a POST request) it should be sufficient to write something like this:
with open('body.txt', 'rb') as f:while True:data = f.read(64*1024)if not data:break
yield from w.drain() # or perhaps drain(w)
w.write(data)I like that this goes directly to the protocol, so that if instead of StreamReader some other input mechanism is used, the drain() call is still the same.
So, suppose I do that. Do you still want the protocol (or the StreamWriter) to keep track of the transport's buffer size? I came up with a bit of a conundrum there.
Some of Tulip's transports have a feature whereby if the socket accepts the data it never gets buffered[1]. If you write less data than the kernel will buffer (typically 64 or 128 K I believe) or slow enough that the network absorbs it, the transport may never have any data in its buffer. But the StreamWriter doesn't know about this, so its calculation of how much data is buffered doesn't take this into account. But now this means that write_buffer_drained() may *never* be called! Unless we also call it for this special case. That still seems simple enough -- we can just call it also whenever a write() takes this shortcut.
import cffiffi = cffi.FFI()ffi.cdef("""#define SO_NREAD ...#define SO_NWRITE ...""")lib = ffi.verify("""#include <sys/socket.h>""")import socketimport timeskt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)skt.connect(("example.com", 80))skt.setblocking(False)print(skt.send(b'x' * 1024 * 20))def moredata():remaining = skt.getsockopt(socket.SOL_SOCKET, lib.SO_NWRITE)print remainingreturn remainingwhile moredata():time.sleep(0.05)
That was a really cool demo program!
- get_buffer_size()- set_buffer_limits(high, low)This thread isn't dead yet. I've now implemented your proposal (being the least bad compromise). The new APIs are not yet in the repo, but here they are: https://codereview.appspot.com/14613043Transport
TL;DR:
I discovered a bunch of missed end cases in the handling of closed connections that I fixed (already in the repo, not in that code review).- drain() (new method)- resume_writing()Protocol- pause_writing()
StreamWriter (new class)
- transport (new property)- write(), writelines(), write_eof(), can_write_eof(), get_extra_info(), close() (pass through)
Also, I chose to make write() raise an exception when the buffer fills up *really* high (10x the limit), which would suggest that you're ignoring flow control. And drain() will raise when the connection is closed.
Thoughts?