And I personally
would have real hard time explaining people while read operation should
be called with "yield from" (or "await" soon), while its counterpart
write - without.
By the end, to state the obvious, I don't call to do something about
existing synchronous write() - just for adding missing async one, and
letting people decide what they want to use.
StreamWriter.drain cannot be called from different tasks (asyncio tasks that is)
at the same time.
This may be relevant to the current discussion, but whenever I see this snippet:s.write(data)yield from s.drain()I think the sequence is backward, in that it should be like:yield from s.drain() # ensure write buffer has space for datas.write(data) # put data in bufferThis could be a significant performance difference in cases like:while condition():s.write(data) # put data in bufferyield from s.drain() # wait for buffer to depletedata = yield from long_operation() # wait some more for slow operationThis would be faster:while condition():yield from s.drain() # ensure space available for datas.write(data) # put data in bufferdata = yield from long_operation() # buffer depletes while slow operation runs
Anyway, to partially address some concerns presented in this thread, perhaps drain could have an optional parameter for head-room needed:yield from s.drain(headroom=len(data))s.write(data)This would facilitate writing one's own async_write(self, data), that reliably avoids buffer overruns.
On Jun 11, 2015, at 2:05 AM, Martin Teichmann <martin.t...@gmail.com> wrote:StreamWriter.drain cannot be called from different tasks (asyncio tasks that is)
at the same time. It raises an assertion error. I added a script that shows this problem.
On Jun 12, 2015, at 2:37 PM, Martin Teichmann <martin.t...@gmail.com> wrote:Hi Glyph, hi Guido, hi everyone.
You have two very different points of critique,
let me respond to both of them:StreamWriter.drain cannot be called from different tasks (asyncio tasks that is)
at the same time.
In my opinion, this is fine. It should probably be some exception other than AssertionError, but it should be an exception. Do not try to manipulate a StreamWriter or a StreamReader from multiple tasks at once. If you encourage people to do this, it is a recipe for corrupted output streams and interleaved writes.
No, it isn't. Asyncio is completely single threaded,
only one task is running at any given time, until the next
yield from. So no writes can ever be interleaved,
unless you explicitly yield from something.
for each in range(10):writer.write(b"A")yield from writer.drain() # be polite! don't buffer too much at once!
Agreed. These streams should not be accessed "concurrently" by different coroutines. A single coroutine that repeatedly calls write() and eventually drain() is in full control over how many bytes it writes before calling drain(), and thus it can easily ensure the memory needed for buffering is strictly bounded. But if multiple independent coroutines engage in this pattern for the same stream, the amount of buffer space is not under control of any single coroutine.
Guido definitely has a point here.
But this problem is solvable: in a system where the writer is slow -
and this is the situation you typically want to use drain() -
all tasks will normally be waiting in drain until the writer reached
its low water mark again. Now my patch will resume all tasks. That is
wrong. The correct solution is to resume only one task (first come first serve
probably being best), and the others only once the writer is fine again.
Yes. However, if you were to consider something likefor each in range(10):writer.write(b"A")yield from writer.drain() # be polite! don't buffer too much at once!you might reasonably expect, reasoning about this coroutine by itself, that you will see a string of 10 "A"s on the wire.
It's theoretically solvable, but for social reasons this is still not something that you want to encourage. Once you start having a fair queueing system around drain() it raises the question of having a fair queueing system for writing whole large messages, which means you then need some kind of higher level lock on the stream, which becomes very confusing (every stream becomes its own full-fledged task scheduler).
I am still wondering, what is your actual use case? Multiple coroutines writing to the same stream sounds like the exception, not the common case -- even when you pay careful attention to the appearance of yield-from, you still have the problem that you can't control which of several coroutines runs first. I must assume that you have a protocol over TCP/IP that has some kind of framing convention, and you always ensure that a coroutine writes an entire frame without yielding.
I wonder if you could just a bare Transport/Protocol pair instead of a StreamWriter? In that case you'd have to implement pause_writing() and resume_writing() in your Protocol class if you want to have the equivalent of drain(). (But do you? Is there nothing at a higher level in your protocol to prevent your writing coroutines from overwhelming the other end of the socket?)
Hi Guido,I am still wondering, what is your actual use case? Multiple coroutines writing to the same stream sounds like the exception, not the common case -- even when you pay careful attention to the appearance of yield-from, you still have the problem that you can't control which of several coroutines runs first. I must assume that you have a protocol over TCP/IP that has some kind of framing convention, and you always ensure that a coroutine writes an entire frame without yielding.
I have a small application that monitors our servers here, and sends the results of this monitoring to a single socket for analysis. The
analysis tool runs outside of the server farm and is not as well connected as the servers inside the farm are amongst each other. The analysis
tool is supposed to log into the monitoring application, tell which parameters it intends to monitor, and then gets those parameters sent over
that single socket using a framed protocol.
Now, for each parameter that I monitor, I write a coroutine (those are typically very small, ten lines on average I guess). That coroutine
more-or-less does
while True:
data = yield from get_monitored_parameter()
writer.write(header)
writer.write(data)
yield from writer.drain()
I wrote my own drain coroutine (mostly like the locks you proposed), so that I can call it from
several tasks, and this gives me a simple flow control. It could be a good idea to add such
locks to to the standard library drain, but I can also see if people think that this is to special
a use case. I added a working patch to this post just in case.
I wonder if you could just a bare Transport/Protocol pair instead of a StreamWriter? In that case you'd have to implement pause_writing() and resume_writing() in your Protocol class if you want to have the equivalent of drain(). (But do you? Is there nothing at a higher level in your protocol to prevent your writing coroutines from overwhelming the other end of the socket?)
I'm using StreamWriter because this is what start_server returns...
I want to keep things simple, my
code is currently 345 lines and works. I think the write-drain-loop approach is a flow control concept
really easy to grasp, that's why I went for this solution.
Sure, I could write some handshaking to avoid overwhelming the other end of the socket, but
TCP already does that for me, so why bother.
On Monday, June 15, 2015 at 10:49:28 PM UTC+3, Guido van Rossum wrote:Fair enough. I guess there is a real danger to overwhelm the socket? Otherwise you don't even need the drain() call. But it looks like you are basically logging as fast as you can, so the drain() seems important.
I still think that this is a relatively rare case and I don't want to encourage sharing streams between coroutines. Maybe you can submit a patch to turn the assert into a RuntimeError?My 5c: I'm writing an AMQP client library. The AMQP protocol consists of a sequence of frames, which may (mostly) be interleaved. I use StreamWriter.drain() before sending large frames to make sure I'm not flooding the write buffer. I ran into this AssertionError while testing publishing messages from multiple tasks works - it didn't. Thinking there's some inherent reason drain() must be called from only one task, I started figuring out what logic I need to determine when to call it with the assumption it returns only when the buffer is empty. Instead it turns out it has water marks and the works, so I just wrapped the call in async with an asyncio.Lock.
I'm not sure the AssertionError is more help than harm, as protocols such as this one where several tasks may operate on the same protocol just need to work around it, and protocols that don't support interleaving of writes probably run into bugs anyway.--
Mathias