StreamWriter.drain cannot be called concurrently

599 views
Skip to first unread message

Martin Teichmann

unread,
Jun 11, 2015, 5:05:23 AM6/11/15
to python...@googlegroups.com
Hi everyone,

StreamWriter.drain cannot be called from different tasks (asyncio tasks that is)
at the same time. It raises an assertion error. I added a script that shows this problem.

What I am doing is the following: several tasks in my program are generating
big amounts of data to be shipped out on a StreamWriter. This can easily
overload the receiver of all that data. This is why every task, after calling
writer.write also calls "yield from writer.drain()". Unfortunately, while draining
another task may write to the same stream writer, also wants to call drain.
This raises an AssertionError.

The problem apparently lies in FlowControlMixin._drain_helper, which assumes
that only one task may wait for a stream to be drained.

(Btw, I am using python 3.4.3)

Greetings

Martin

problem showing script follows:

from asyncio import (async, coroutine, start_server, get_event_loop,
    open_connection, sleep)

@coroutine
def callback(reader, writer):
    yield from sleep(2)

@coroutine
def write(writer):
    writer.write((" " * 100000000).encode("ascii"))
    yield from writer.drain()
    yield from sleep(1)

@coroutine
def test():
    reader, writer = yield from open_connection("localhost", 43522)
    async(write(writer))
    async(write(writer))

async(start_server(callback, port=43522))
get_event_loop().run_until_complete(test())

Paul Sokolovsky

unread,
Jun 11, 2015, 5:56:00 AM6/11/15
to python...@googlegroups.com, Guido van Rossum, Victor Stinner
Hello,

On Thu, 11 Jun 2015 02:05:23 -0700 (PDT)
Martin Teichmann <martin.t...@gmail.com> wrote:

[]

> What I am doing is the following: several tasks in my program are
> generating big amounts of data to be shipped out on a StreamWriter.
> This can easily overload the receiver of all that data. This is why
> every task, after calling
> writer.write also calls "yield from writer.drain()". Unfortunately,
> while draining
> another task may write to the same stream writer, also wants to call
> drain. This raises an AssertionError.

This is a big problem, about which I wanted to write for a long time.
The root of the problem is however not drain(), but a synchronous
write() method, whose semantics seems to be drawn as to easily allow DoS
attacks on the platform where the code runs - it's required to buffer
unlimited amounts of data, which is not possible on any physical
platform, and will only lead to excessive virtual memory swapping and
out-of-memory killings on real systems (why the reference to DoS).

Can we please-please have async_write() method? Two boundary
implementations of it would be:

# Same behavior as currently - unlimited buffering
def async_write(...):
return self.write()
yield


# Memory-conscious implementation
def async_write(...):
self.write()
yield from self.drain()


But the point is that it's an asyncio implementation what will be able
to decide on a behavior. For example, there can be implementation which
decides how much to buffer based on physical memory availability, which
is global shared resource, and thus needs to be scheduled globally.

Contrast that with Transport-local flow control as how the problem is
supposed to be tackled currently. A particular Transport doesn't have
insight into complete system functioning, so cannot schedule resources
efficiently. Moreover, Transports is a separate, tangled API. People
who want to use asyncio want to deal with nice Python coroutines, and
suddenly face completely foreign "Transport API" picked up from a
20-year old, legacy package. The only reasonable response to expect
from people as that they will ignore all the complexities of it, which
is the way to DoS, as described above.


So, as I already argued previously on this list and python-dev, please
kindly structure asyncio API in such a way that alternative
implementations are possible, which support asyncio coroutine paradigm,
but are devoid of extra layers not directly related to this native
paradigm. Thus, such layers should not be on a critical path to be able
to use asyncio in such native way. Examples of such layers are Future
(discussed before) and Transport (discussed here).

Implementation of these ideas is available in uasyncio, an asyncio
implementation for MicroPython. async_write() method above is called
awrite() for brevity:
https://github.com/micropython/micropython-lib/blob/master/uasyncio/uasyncio/__init__.py#L102

There exist compatibility layer which implements uasyncio-compatible
API on top of CPython asyncio (by monkey-patching):

https://github.com/micropython/micropython-lib/blob/master/cpython-uasyncio/uasyncio.py#L83



Thanks for your consideration of this issue!



--
Best regards,
Paul mailto:pmi...@gmail.com

Gustavo Carneiro

unread,
Jun 11, 2015, 6:04:57 AM6/11/15
to python-tulip
I have some concerns about encouraging such an API.  Many applications will want to do small writes, of a few bytes at a time.  Making every write() call a coroutine causes an enormous amount of overhead, as each time you write some small piece of data you have to suspend the current coroutine and go back to the main loop.

So, I'm happy with the current API, plus documentation explaining that you need "yield from self.drain()" at appropriate places.

-- 
Gustavo J. A. M. Carneiro
Gambit Research
"The universe is always one step beyond logic." -- Frank Herbert

Paul Sokolovsky

unread,
Jun 11, 2015, 6:36:38 AM6/11/15
to Gustavo Carneiro, python-tulip
Hello,
You can also always keep possibility of rewriting bottlenecks in your
code in the assembler. But as long as we talk about asynchronous I/O
framework in Python, let's talk about it. And an idea that asynchronous
framework has synchronous operations sprinkled in random places alone
can raise an eyebrow.

Random, depends. There was a lot of talk lately on python-dev lately
(in regard to async/await PEP 0492) that asyncio should be more
friendly to beginners and layman folks who don't care about all that
synchrony/asynchrony, but just want to write apps. And I personally
would have real hard time explaining people while read operation should
be called with "yield from" (or "await" soon), while its counterpart
write - without.

Finally, if generators are known to cause "enormous amount of
overhead", then Python community should think and work on improving
that, not allowing to use them in some random places and disallowing -
in other. For example, someone should question how so happens that
"recursive yield from" optimization which was (IIRC) part of original
Greg Ewing's "yield from" implementation is still not in mainline.

By the end, to state the obvious, I don't call to do something about
existing synchronous write() - just for adding missing async one, and
letting people decide what they want to use.


> So, I'm happy with the current API, plus documentation explaining
> that you need "yield from self.drain()" at appropriate places.
>
> --
> Gustavo J. A. M. Carneiro
> Gambit Research
> "The universe is always one step beyond logic." -- Frank Herbert



Martin Teichmann

unread,
Jun 11, 2015, 7:36:42 AM6/11/15
to python...@googlegroups.com
Hello,
 
And I personally
would have real hard time explaining people while read operation should
be called with "yield from" (or "await" soon), while its counterpart
write - without.

No. This is actually very simple and obvious: when you call read, you
need the result of that to continue working, with write, you don't.
So you need to wait for the former, but not for the latter.
There simply is no symmetry between read and write when it comes
to asyncio. This is like asking "why does write have a parameter
data, while read doesn't?", well, because, they are different.

The entire coolness of asyncio lies in the fact that you clearly mark
the places where concurrency enters. By making just everything a
coroutine, this great idea just disappears. And write is a very good
example for this. Sometimes in my code I have two writes:

    writer.write("something")
    writer.write("else")

because this is asyncio, I know that those two writes will be called
directly after each other, and that the data goes out into the world
just like that. This is the large advantage over multi-threaded
programming: there some other thread might write something
in between those lines, and the result is just some mingled data.

Sure, in multi-threaded programming I can just use mutexes. But
you quickly run into large problems with hardly debuggable code.

So in short: when designing an API for use with asyncio, try
to limit the use of coroutines to where it is really, really
necessary. 

Btw, is there anyone who could comment on my original posting,
that StreamWriter.drain cannot be called concurrently?

Greetings

Martin

Gustavo Carneiro

unread,
Jun 11, 2015, 7:39:10 AM6/11/15
to Paul Sokolovsky, python-tulip
On 11 June 2015 at 11:36, Paul Sokolovsky <pmi...@gmail.com> wrote:
Enormous is relative. I mean compared to writing a few bytes.  It's like sending a UDP packet with a few bytes inside: the overhead of the outer protocol headers is much greater than the payload itself, which means it will be very inefficient.
 
By the end, to state the obvious, I don't call to do something about
existing synchronous write() - just for adding missing async one, and
letting people decide what they want to use.

Yes.  But the async version is just a shortcut, it just saves you from adding an addition "yield from self.drain()" line, that's all.

Actually, thinking about this problem some more, I wonder if we could do better?

I know we have WriteProtocol.set_write_buffer_limits(), which is documented as "Set the high- and low-water limits for write flow control.  These two values control when call the protocol’s pause_writing() and resume_writing() methods are called".  So, these "write buffer limits" are only used for the the transport to communicate pause_writing/resuming_writing to the protocol.  

If we wanted asyncio to be more memory-conscious by default, how about:

    1. Have some sane defaults for the write buffer limits;

    2. Make WriteTransport.write() raise an exception if ever the buffer data rises above the threshold.

As a result, an asyncio application that forgets to call drain() on its streams will eventually get an exception.  If the exception message is clear enough, the programmer will realize he forgot to add a yield from stream.drain().

The downside is the "eventually get an exception" part: application may work fine most of the time, but once in a while it will get an exception.  Annoying.  On the other hand, if the application forgets drain() then the program may run fine most time, but one day it will run out of memory and explode.  I think I prefer an exception.

Does anyone think this would be a good idea?  I'm only half convinced myself, but I thought it is worth sharing.

Thanks,
Gustavo.


Paul Sokolovsky

unread,
Jun 11, 2015, 8:01:38 AM6/11/15
to Martin Teichmann, python...@googlegroups.com
Hello,

On Thu, 11 Jun 2015 04:36:42 -0700 (PDT)
Martin Teichmann <martin.t...@gmail.com> wrote:

> Hello,
>
>
> > And I personally
> > would have real hard time explaining people while read operation
> > should be called with "yield from" (or "await" soon), while its
> > counterpart write - without.
> >
>
> No. This is actually very simple and obvious: when you call read, you
> need the result of that to continue working, with write, you don't.

You just described why they *can* be different, not why they *have* to
be.

> So you need to wait for the former, but not for the latter.

Everything is relative. If data to read is already available and
buffered, you don't need to wait for it. And vice-versa, if there's
nowhere to buffer write data, you have to wait for buffer space to be
available.

> There simply is no symmetry between read and write when it comes
> to asyncio. This is like asking "why does write have a parameter
> data, while read doesn't?", well, because, they are different.

There's no limit in how different you can make 2 things (no matter how
similar they are). But the talk is about different direction - how to
make them similar and consistent.

> The entire coolness of asyncio lies in the fact that you clearly mark
> the places where concurrency enters. By making just everything a
> coroutine, this great idea just disappears.

No, it doesn't. Nobody want to deprive you of right to use sync write,
it stays where it was.

> And write is a very good
> example for this. Sometimes in my code I have two writes:
>
> writer.write("something")
> writer.write("else")
>
> because this is asyncio, I know that those two writes will be called
> directly after each other, and that the data goes out into the world
> just like that.

No, you can't. Almost no networking stack gives such guarantees, TCP/IP
for one doesn't (if we talk about message boundaries).

And then the same unguaranteedness is possible with async version:

yield from writer.async_write("something" + "else")

(replace with io.StringIO() per your needs).

> This is the large advantage over multi-threaded
> programming: there some other thread might write something
> in between those lines, and the result is just some mingled data.

Sorry, in the end, if you write to the same sink from 2 concurrency
primitives, be it 2 threads, or 2 coroutines, without synchronization
between them, you'll end up with a mess sooner or later. Most people
simply have one producer for one sink. But if you want to do tricks
like the above - ... you can keep doing them, as sync write stays where
it was. (But then how do you deal with read?)

> Sure, in multi-threaded programming I can just use mutexes. But
> you quickly run into large problems with hardly debuggable code.
>
> So in short: when designing an API for use with asyncio, try
> to limit the use of coroutines to where it is really, really
> necessary.
>
> Btw, is there anyone who could comment on my original posting,
> that StreamWriter.drain cannot be called concurrently?

Well, sorry for hijacking your thread. I kinda meant that async_write()
would certainly call fixed drain() ;-).

>
> Greetings
>
> Martin

Paul Sokolovsky

unread,
Jun 11, 2015, 8:32:54 AM6/11/15
to Gustavo Carneiro, python-tulip
Hello,

On Thu, 11 Jun 2015 12:39:09 +0100
Gustavo Carneiro <gjcar...@gmail.com> wrote:

[]

> > By the end, to state the obvious, I don't call to do something about
> > existing synchronous write() - just for adding missing async one,
> > and letting people decide what they want to use.
> >
>
> Yes. But the async version is just a shortcut, it just saves you from
> adding an addition "yield from self.drain()" line, that's all.

No, as pointed out in the original mail. It straightens up API, and
makes asynchronous framework really asynchronous. Then 2 examples of
implementations were provided: on one extreme, async_write() will keep
*not* calling drain(). This choice would certainly be selected by
CPython's asyncio. Other extreme is calling drain() after each write.
This is the choice of MicroPython's uasyncio, which values memory more
than performance which can be achieved by consolidating small writes'
data.

But those are just 2 extremes. A both performant and robust
implementation of async_write() would set aside 1MB (1GB, 1TB) of
physical memory and buffer to it (return immediately) as long as
possible, and block and flush it otherwise. And it would do that
globally for all connections in asyncio loop instance, because it works
on that level. So there would never be drastic drop in response
performance, unlike a case when 1MB is reserved per connection, then
gazillion of connections arrive, and it all starts to swap/OOM.

> Actually, thinking about this problem some more, I wonder if we could
> do better?
>
> I know we have WriteProtocol.set_write_buffer_limits(), which is

But I mentioned that in my original email too! Let me repeat that
argumentation again. Not "us" have it, "them" have it. Them being
Twisted, etc. Then nice guy Guido wanted to make asyncio a uniting
ground for all Python asynchronous frameworks, so he collected a piece
from each existing framework so nobody felt omitted. Very worthy and
honorable aim, I would probably do it like that if I was a BDFL.

But thanks god I am not, and have my independent opinion. I already
expressed it on python-dev and let me do it again: It might have been
better to leave Twisted and friends where they are, and take a fresh
start with asyncio.

But again, I don't call to remove all that Twisted, etc. stuff. All I
ask is: please, let there be asyncio implementations based on: 1)
coroutines; 2) nothing else. There's such implementation -
MicroPython's uasyncio. The only remaining stumbling block is
asynchronous write method. Please kindly let us be compatible with
asyncio, by adding to asyncio an async_write(). You don't have to use it
if you don't want (but trust me, people will like it and will use it).

[]

David Keeney

unread,
Jun 11, 2015, 9:37:08 AM6/11/15
to python...@googlegroups.com
This may be relevant to the current discussion, but whenever I see this snippet:
 
  s.write(data)
  yield from s.drain()
 
I think the sequence is backward, in that it should be like:
 
  yield from s.drain()    # ensure write buffer has space for data
  s.write(data)              # put data in buffer
 
 
This could be a significant performance difference in cases like:
 
while condition():
    s.write(data)             # put data in buffer
    yield from s.drain()  #  wait for buffer to deplete
 
    data = yield from long_operation()      # wait some more for slow operation
 
This would be faster:
 
while condition():
    yield from s.drain()   # ensure space available for data
    s.write(data)             # put data in buffer
   
    data = yield from long_operation()  # buffer depletes while slow operation runs
 
 
Anyway, to partially address some concerns presented in this thread, perhaps drain could have an optional parameter for head-room needed:
 
   yield from s.drain(headroom=len(data))
   s.write(data)
 
This would facilitate writing one's own async_write(self, data), that reliably avoids buffer overruns.
 
 

Martin Teichmann

unread,
Jun 11, 2015, 11:03:39 AM6/11/15
to python...@googlegroups.com

Hi everyone,

StreamWriter.drain cannot be called from different tasks (asyncio tasks that is)
at the same time.

I just filed a bug fix for this problem here: http://bugs.python.org/issue24431

Greetings

Martin

Gustavo Carneiro

unread,
Jun 11, 2015, 11:05:22 AM6/11/15
to David Keeney, python-tulip
On 11 June 2015 at 14:37, David Keeney <dke...@travelbyroad.net> wrote:
This may be relevant to the current discussion, but whenever I see this snippet:
 
  s.write(data)
  yield from s.drain()
 
I think the sequence is backward, in that it should be like:
 
  yield from s.drain()    # ensure write buffer has space for data
  s.write(data)              # put data in buffer
 
 
This could be a significant performance difference in cases like:
 
while condition():
    s.write(data)             # put data in buffer
    yield from s.drain()  #  wait for buffer to deplete
 
    data = yield from long_operation()      # wait some more for slow operation
 
This would be faster:
 
while condition():
    yield from s.drain()   # ensure space available for data
    s.write(data)             # put data in buffer
   
    data = yield from long_operation()  # buffer depletes while slow operation runs 
 
Yes, you're right.   There are cases when draining before writing is preferable.  

But in the normal case, I think it's preferable to have a function that writes lots of data to a stream to "clean up after itself", so that if draining the buffer becomes slow you can measure it and discover (to some extent) which function makes it slow.

Anyway, having these different approaches to draining makes a case for keeping writing and [waiting for] draining separate methods.

 
Anyway, to partially address some concerns presented in this thread, perhaps drain could have an optional parameter for head-room needed:
 
   yield from s.drain(headroom=len(data))
   s.write(data)
 
This would facilitate writing one's own async_write(self, data), that reliably avoids buffer overruns.

+1

Glyph

unread,
Jun 11, 2015, 2:59:38 PM6/11/15
to python-tulip

On Jun 11, 2015, at 2:05 AM, Martin Teichmann <martin.t...@gmail.com> wrote:

StreamWriter.drain cannot be called from different tasks (asyncio tasks that is)
at the same time. It raises an assertion error. I added a script that shows this problem.

In my opinion, this is fine.  It should probably be some exception other than AssertionError, but it should be an exception.  Do not try to manipulate a StreamWriter or a StreamReader from multiple tasks at once.  If you encourage people to do this, it is a recipe for corrupted output streams and interleaved writes.

-glyph

Guido van Rossum

unread,
Jun 12, 2015, 3:19:38 PM6/12/15
to Glyph, python-tulip
Agreed. These streams should not be accessed "concurrently" by different coroutines. A single coroutine that repeatedly calls write() and eventually drain() is in full control over how many bytes it writes before calling drain(), and thus it can easily ensure the memory needed for buffering is strictly bounded. But if multiple independent coroutines engage in this pattern for the same stream, the amount of buffer space is not under control of any single coroutine. Even calling drain() after each write does not prevent this (as Martin's original test script shows -- both coroutines end up calling write() before anything else happens). Because of this reasoning I don't really think that calling drain() before write() matters -- either way you're alternating between write() and drain().

Finally, take all of this with a grain of salt, because the flow control protocol is complicated -- drain()'s behavior is affected by set_write_buffer_limits(), and in addition there is buffering in the OS socket layer (which may in turn be tuned by setsockopt(), but you rarely need this -- even if you can control the buffering in your own OS, you can't control the buffering in the rest of the network or the receiving host -- so we don't provide a direct interface to it).

--
--Guido van Rossum (python.org/~guido)

Martin Teichmann

unread,
Jun 12, 2015, 5:37:00 PM6/12/15
to python...@googlegroups.com, gu...@python.org, gl...@twistedmatrix.com
Hi Glyph, hi Guido, hi everyone.

You have two very different points of critique,
let me respond to both of them:

> StreamWriter.drain cannot be called from different tasks (asyncio tasks that is)
> at the same time.
>
>
> In my opinion, this is fine.  It should probably be some exception other than AssertionError, but it should be an exception.  Do not try to manipulate a StreamWriter or a StreamReader from multiple tasks at once.  If you encourage people to do this, it is a recipe for corrupted output streams and interleaved writes.

No, it isn't. Asyncio is completely single threaded,
only one task is running at any given time, until the next
yield from. So no writes can ever be interleaved,
unless you explicitly yield from something.

As an example, in my code I am calling something like:
writer.write(header)
writer.write(payload)
While everyone trained in multi-threaded code will get a
heart attack reading this code, it's no issue whatsoever
with asyncio: the two calls to write will always nicely
follow each other without any interleaving, as there simply
is no other thread that could do anything.

> Agreed. These streams should not be accessed "concurrently" by different coroutines. A single coroutine that repeatedly calls write() and eventually drain() is in full control over how many bytes it writes before calling drain(), and thus it can easily ensure the memory needed for buffering is strictly bounded. But if multiple independent coroutines engage in this pattern for the same stream, the amount of buffer space is not under control of any single coroutine.

Guido definitely has a point here.
But this problem is solvable: in a system where the writer is slow -
and this is the situation you typically want to use drain() -
all tasks will normally be waiting in drain until the writer reached
its low water mark again. Now my patch will resume all tasks. That is
wrong. The correct solution is to resume only one task (first come first serve
probably being best), and the others only once the writer is fine again.

This is how I will solve it in my project. If someone is interested,
I could also post the result here.

Greetings

Martin


Glyph

unread,
Jun 13, 2015, 4:30:02 AM6/13/15
to Martin Teichmann, python...@googlegroups.com, Guido van Rossum
On Jun 12, 2015, at 2:37 PM, Martin Teichmann <martin.t...@gmail.com> wrote:

Hi Glyph, hi Guido, hi everyone.

You have two very different points of critique,
let me respond to both of them:

StreamWriter.drain cannot be called from different tasks (asyncio tasks that is)
at the same time.


In my opinion, this is fine.  It should probably be some exception other than AssertionError, but it should be an exception.  Do not try to manipulate a StreamWriter or a StreamReader from multiple tasks at once.  If you encourage people to do this, it is a recipe for corrupted output streams and interleaved writes.

No, it isn't. Asyncio is completely single threaded,

I am aware of that; that's why I said "tasks" and not "threads".

only one task is running at any given time, until the next
yield from. So no writes can ever be interleaved,
unless you explicitly yield from something.

Yes.  However, if you were to consider something like

for each in range(10):
    writer.write(b"A")
    yield from writer.drain() # be polite! don't buffer too much at once!

you might reasonably expect, reasoning about this coroutine by itself, that you will see a string of 10 "A"s on the wire.  But of course you might get "ABABABABABABABABABAB" if another coroutine were doing the same thing with "B" elsewhere.

This becomes a practical concern when you have a protocol that involves potentially large messages (like everybody's favorite, HTTP) where yielding to drain() before writing chunks of those messages would be good form, but would be highly dangerous if you had concurrent writers.  Which, in the future, you likely will, in the form of HTTP/2.

Keep in mind also that if you have multiple writers to a stream, you might get interleaved writes even if you are not calling drain(); anything you 'yield from' might task switch to another writer.

Agreed. These streams should not be accessed "concurrently" by different coroutines. A single coroutine that repeatedly calls write() and eventually drain() is in full control over how many bytes it writes before calling drain(), and thus it can easily ensure the memory needed for buffering is strictly bounded. But if multiple independent coroutines engage in this pattern for the same stream, the amount of buffer space is not under control of any single coroutine.

Guido definitely has a point here.
But this problem is solvable: in a system where the writer is slow -
and this is the situation you typically want to use drain() -
all tasks will normally be waiting in drain until the writer reached
its low water mark again. Now my patch will resume all tasks. That is
wrong. The correct solution is to resume only one task (first come first serve
probably being best), and the others only once the writer is fine again.

It's theoretically solvable, but for social reasons this is still not something that you want to encourage.  Once you start having a fair queueing system around drain() it raises the question of having a fair queueing system for writing whole large messages, which means you then need some kind of higher level lock on the stream, which becomes very confusing (every stream becomes its own full-fledged task scheduler).

-glyph

Martin Teichmann

unread,
Jun 13, 2015, 11:49:31 AM6/13/15
to python...@googlegroups.com, gu...@python.org, martin.t...@gmail.com
Hi Glyph,


Yes.  However, if you were to consider something like

for each in range(10):
    writer.write(b"A")
    yield from writer.drain() # be polite! don't buffer too much at once!

you might reasonably expect, reasoning about this coroutine by itself, that you will see a string of 10 "A"s on the wire.

No. Reasoning about that very coroutine one immediately asks: what's that yield from doing there? This is actually
a very good example why this way of programming is a good idea: bugs become so obvious. The rule is so
simple: don't put yield froms into code that should execute as one block. Those yield froms even need to be
explicitly there, they cannot be hidden in a subroutine.

Sure, there are situations in which such a programming technique is not a good idea, but this also becomes
evident very quickly: if you realize that you do need yield froms between your writes (for whatever reason)
then it's a good idea to change strategy. 


It's theoretically solvable, but for social reasons this is still not something that you want to encourage.  Once you start having a fair queueing system around drain() it raises the question of having a fair queueing system for writing whole large messages, which means you then need some kind of higher level lock on the stream, which becomes very confusing (every stream becomes its own full-fledged task scheduler).
 
Only because some applications might need a complicated scheduler does not mean everyone needs it.
Compare that to the task scheduler of asyncio: it's very basic. And that's fine, as it is good enough for
the vast majority of applications. And those who need something better, are free to implement their improved
scheduler.

If your argument was true, this would mean that also the asyncio scheduler would need to be a full-fledged
fair scheduler, just because some appplications need it.

Anyhow, I'll write my small FIFO output scheduler, it will do the job for my small project, so I don't care
anymore whether StreamWriter.drain works task-concurrently or not.

Greetings

Martin

Guido van Rossum

unread,
Jun 13, 2015, 1:13:30 PM6/13/15
to Martin Teichmann, python-tulip
Martin,

I am still wondering, what is your actual use case? Multiple coroutines writing to the same stream sounds like the exception, not the common case -- even when you pay careful attention to the appearance of yield-from, you still have the problem that you can't control which of several coroutines runs first. I must assume that you have a protocol over TCP/IP that has some kind of framing convention, and you always ensure that a coroutine writes an entire frame without yielding.

I wonder if you could just a bare Transport/Protocol pair instead of a StreamWriter? In that case you'd have to implement pause_writing() and resume_writing() in your Protocol class if you want to have the equivalent of drain(). (But do you? Is there nothing at a higher level in your protocol to prevent your writing coroutines from overwhelming the other end of the socket?)

In any case, a simple solution for ensuring only one coroutine calls drain() at a time would be to associate an asyncio.Lock with the stream -- then you can do

  with (yield from lock):
      yield from writer.drain()

Martin Teichmann

unread,
Jun 15, 2015, 6:23:13 AM6/15/15
to python...@googlegroups.com, gu...@python.org
Hi Guido,

 
I am still wondering, what is your actual use case? Multiple coroutines writing to the same stream sounds like the exception, not the common case -- even when you pay careful attention to the appearance of yield-from, you still have the problem that you can't control which of several coroutines runs first. I must assume that you have a protocol over TCP/IP that has some kind of framing convention, and you always ensure that a coroutine writes an entire frame without yielding.

I have a small application that monitors our servers here, and sends the results of this monitoring to a single socket for analysis. The
analysis tool runs outside of the server farm and is not as well connected as the servers inside the farm are amongst each other. The analysis
tool is supposed to log into the monitoring application, tell which parameters it intends to monitor, and then gets those parameters sent over
that single socket using a framed protocol.

Now, for each parameter that I monitor, I write a coroutine (those are typically very small, ten lines on average I guess). That coroutine
more-or-less does

    while True:
         data = yield from get_monitored_parameter()
         writer.write(header)
         writer.write(data)
         yield from writer.drain()

I wrote my own drain coroutine (mostly like the locks you proposed), so that I can call it from
several tasks, and this gives me a simple flow control. It could be a good idea to add such
locks to to the standard library drain, but I can also see if people think that this is to special
a use case. I added a working patch to this post just in case.

I wonder if you could just a bare Transport/Protocol pair instead of a StreamWriter? In that case you'd have to implement pause_writing() and resume_writing() in your Protocol class if you want to have the equivalent of drain(). (But do you? Is there nothing at a higher level in your protocol to prevent your writing coroutines from overwhelming the other end of the socket?)

I'm using StreamWriter because this is what start_server returns... I want to keep things simple, my
code is currently 345 lines and works. I think the write-drain-loop approach is a flow control concept
really easy to grasp, that's why I went for this solution.

Sure, I could write some handshaking to avoid overwhelming the other end of the socket, but
TCP already does that for me, so why bother.

Greetings

Martin

My promised patch follows:

--- streams2.py 2015-06-11 16:49:14.851855051 +0200
+++ streams.py  2015-06-15 11:04:35.119700288 +0200
@@ -13,6 +13,7 @@
 from . import coroutines
 from . import events
 from . import futures
+from . import locks
 from . import protocols
 from .coroutines import coroutine
 from .log import logger
@@ -151,6 +152,7 @@
             self._loop = loop
         self._paused = False
         self._drain_waiter = None
+        self._drain_lock = locks.Lock(loop=self._loop)
         self._connection_lost = False
 
     def pause_writing(self):
@@ -191,13 +193,12 @@
     def _drain_helper(self):
         if self._connection_lost:
             raise ConnectionResetError('Connection lost')
-        if not self._paused:
-            return
-        waiter = self._drain_waiter
-        assert waiter is None or waiter.cancelled()
-        waiter = futures.Future(loop=self._loop)
-        self._drain_waiter = waiter
-        yield from waiter
+        with (yield from self._drain_lock):
+            if not self._paused:
+                return
+            waiter = futures.Future(loop=self._loop)
+            self._drain_waiter = waiter
+            yield from waiter
 
 
 class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):

Guido van Rossum

unread,
Jun 15, 2015, 3:49:28 PM6/15/15
to Martin Teichmann, python-tulip
On Mon, Jun 15, 2015 at 3:23 AM, Martin Teichmann <martin.t...@gmail.com> wrote:
Hi Guido,
 
I am still wondering, what is your actual use case? Multiple coroutines writing to the same stream sounds like the exception, not the common case -- even when you pay careful attention to the appearance of yield-from, you still have the problem that you can't control which of several coroutines runs first. I must assume that you have a protocol over TCP/IP that has some kind of framing convention, and you always ensure that a coroutine writes an entire frame without yielding.

I have a small application that monitors our servers here, and sends the results of this monitoring to a single socket for analysis. The
analysis tool runs outside of the server farm and is not as well connected as the servers inside the farm are amongst each other. The analysis
tool is supposed to log into the monitoring application, tell which parameters it intends to monitor, and then gets those parameters sent over
that single socket using a framed protocol.

Now, for each parameter that I monitor, I write a coroutine (those are typically very small, ten lines on average I guess). That coroutine
more-or-less does

    while True:
         data = yield from get_monitored_parameter()
         writer.write(header)
         writer.write(data)
         yield from writer.drain()

I wrote my own drain coroutine (mostly like the locks you proposed), so that I can call it from
several tasks, and this gives me a simple flow control. It could be a good idea to add such
locks to to the standard library drain, but I can also see if people think that this is to special
a use case. I added a working patch to this post just in case.

Aha, you are logging. That's the other case where relative ordering of stuff you write needn't matter. :-)
 
I wonder if you could just a bare Transport/Protocol pair instead of a StreamWriter? In that case you'd have to implement pause_writing() and resume_writing() in your Protocol class if you want to have the equivalent of drain(). (But do you? Is there nothing at a higher level in your protocol to prevent your writing coroutines from overwhelming the other end of the socket?)

I'm using StreamWriter because this is what start_server returns...

Well, start_server() wraps loop.create_server(). But I get your point, especially since you also have the login protocol to deal with.
 
I want to keep things simple, my
code is currently 345 lines and works. I think the write-drain-loop approach is a flow control concept
really easy to grasp, that's why I went for this solution.

Sure, I could write some handshaking to avoid overwhelming the other end of the socket, but
TCP already does that for me, so why bother.

Fair enough. I guess there is a real danger to overwhelm the socket? Otherwise you don't even need the drain() call. But it looks like you are basically logging as fast as you can, so the drain() seems important.

I still think that this is a relatively rare case and I don't want to encourage sharing streams between coroutines. Maybe you can submit a patch to turn the assert into a RuntimeError?
Message has been deleted

Guido van Rossum

unread,
Dec 1, 2016, 11:32:17 AM12/1/16
to Mathias Fröjdman, python-tulip, Martin Teichmann
Is the assert still there? Maybe someone should just submit a PR with a fix? (Although it's too late for 3.6.0, it may make it into 3.6.1 since it feels like a bugfix to me.)

On Tue, Nov 29, 2016 at 8:01 AM, Mathias Fröjdman <m...@iki.fi> wrote:
On Monday, June 15, 2015 at 10:49:28 PM UTC+3, Guido van Rossum wrote:
Fair enough. I guess there is a real danger to overwhelm the socket? Otherwise you don't even need the drain() call. But it looks like you are basically logging as fast as you can, so the drain() seems important.
 
I still think that this is a relatively rare case and I don't want to encourage sharing streams between coroutines. Maybe you can submit a patch to turn the assert into a RuntimeError?

My 5c: I'm writing an AMQP client library. The AMQP protocol consists of a sequence of frames, which may (mostly) be interleaved. I use StreamWriter.drain() before sending large frames to make sure I'm not flooding the write buffer. I ran into this AssertionError while testing publishing messages from multiple tasks works - it didn't. Thinking there's some inherent reason drain() must be called from only one task, I started figuring out what logic I need to determine when to call it with the assumption it returns only when the buffer is empty. Instead it turns out it has water marks and the works, so I just wrapped the call in async with an asyncio.Lock.

I'm not sure the AssertionError is more help than harm, as protocols such as this one where several tasks may operate on the same protocol just need to work around it, and protocols that don't support interleaving of writes probably run into bugs anyway.

--
Mathias
Reply all
Reply to author
Forward
0 new messages