using coroutines within protocols?

623 views
Skip to first unread message

Eli Bendersky

unread,
Jan 22, 2014, 9:26:57 AM1/22/14
to python-tulip
Hello,

I've been playing with some asyncio-related ideas and a question came up about using coroutines in the data_received method of asyncio Protocols.

Here's a simple port of Twisted's IntNStringReceiverProtocol for asyncio:

https://github.com/eliben/python3-samples/blob/master/async/asyncio-len-prefixed-string-protocol.py

It's pretty straightforward, but it bothers me that data_received has to essentially manage a state machine, whereas I'd expect to be able to use coroutines there to just say something like the following in a loop:

... got length word
... now get 'length' bytes [yield from....]
... dispatch "string_received"

Is this possible?

Thanks in advance,
Eli

Guido van Rossum

unread,
Jan 22, 2014, 10:33:41 AM1/22/14
to Eli Bendersky, python-tulip
This question keeps coming up...

The solution is simple: write that logic as a separate method marked
with @coroutine, and fire it off in data_received() using async() (==
Task(), in this case). E.g.

def data_received(self, data):
asyncio.async(self.process_data(data))

@asyncio.coroutine
def process_data(self, data):
...stuff using yield from...

(The reason why this isn't built into the protocol is that if it was,
it would require alternate event loop implementations to deal with
coroutines.)
--
--Guido van Rossum (python.org/~guido)

Victor Stinner

unread,
Jan 22, 2014, 11:27:10 AM1/22/14
to Guido van Rossum, Eli Bendersky, python-tulip
2014/1/22 Guido van Rossum <gu...@python.org>:
> This question keeps coming up...

Is anyone motivated to contribute to the documentation? :-) Thibaut
Dirlik maybe?

Victor

Glyph

unread,
Jan 23, 2014, 3:28:56 AM1/23/14
to Guido van Rossum, Eli Bendersky, python-tulip

On Jan 22, 2014, at 7:33 AM, Guido van Rossum <gu...@python.org> wrote:

The solution is simple: write that logic as a separate method marked
with @coroutine, and fire it off in data_received() using async() (==
Task(), in this case). E.g.

Wouldn't that process each subsequent TCP segment in a parallel coroutine, thereby with multiple segments of data stomping on each other if they're yielding during processing?

It seems to me that the real solution to do this (IntNReceiver) in the coroutine style would be to use StreamReader, and then do something like this:

@coroutine
def read_one_string():
    prefix_length = struct.calcsize(prefix_format)
    prefix = yield from reader.readexactly(prefix_length)
    size = struct.unpack(prefix_format, prefix)[0]
    return (yield from reader.readexactly(size))

If you don't want to use StreamReader, then you'll need to implement something like what it does with feed_data, unblocking futures that are waiting for data_received to be called:


-glyph

Victor Stinner

unread,
Jan 23, 2014, 6:38:24 AM1/23/14
to Glyph, Guido van Rossum, Eli Bendersky, python-tulip
2014/1/23 Glyph <gl...@twistedmatrix.com>:
> On Jan 22, 2014, at 7:33 AM, Guido van Rossum <gu...@python.org> wrote:
>
>> The solution is simple: write that logic as a separate method marked
>> with @coroutine, and fire it off in data_received() using async() (==
>> Task(), in this case). E.g.
>
> Wouldn't that process each subsequent TCP segment in a parallel coroutine,
> thereby with multiple segments of data stomping on each other if they're
> yielding during processing?
>
> It seems to me that the real solution to do this (IntNReceiver) in the
> coroutine style would be to use StreamReader, (...)

The StreamReader and StreamWriter classes were not well documented. I
moved all stream things to a new page in asyncio documentation and I
added a simple example using open_connection() to show how to use
streams.

I spoke with Thibaut Dirlik (who asked the same question, how to use a
coroutine in a protocol). He didn't know StreamReader and implemented
its readline() method :-( He now wants to use StreamReader.

I still don't understand yet when protocols should be used and when
streams should be preferred. Are stream classes the high-level API?

I only found one major difference between protocols and streams: how
EOF is handled. It's not possible to call a function on EOF when using
a StreamReader. Protocols have a eof_received() method. If a stream
reader is waiting for incoming data, it wakes up the coroutine. By the
way, I don't like the current behaviour of readexactly() on EOF: I
opened the issue #111.

Victor

Guido van Rossum

unread,
Jan 23, 2014, 12:39:26 PM1/23/14
to Victor Stinner, Glyph, Eli Bendersky, python-tulip
Thanks for the docs update; I didn't realize these were little-known.

Streams shouldn't have callbacks, the existing stream convention for
detecting EOF (empty read) should be fine. However I agree that
readexactly() should raise an exception. A subclass of EOFError is
fine.

Guido van Rossum

unread,
Jan 23, 2014, 12:47:52 PM1/23/14
to Glyph, Eli Bendersky, python-tulip
Oh, I apologize for not following the links in the original question.
Indeed in this case StreamReader is the solution.

I was answering a different question that gets asked a lot too (though
perhaps the underlying use case is the same -- apparently StreamReader
is not advertised well enough): "Why can't data_received() etc. be
coroutines so they can use 'yield from'?"

IRC you (Glyph) specifically didn't want that, because it would
increase the burden for alternate event loop implementations. And your
observation about multiple tasks is correct too, of course. But
allowing data_received() to be a coroutine doesn't solve this by
itself. I don't want the transport to pause or start buffering until
that coroutine completes. So I think we should just note that
protocols are a lower-level concept than StreamReader. (And more
versatile -- that's usually how lower-level concepts work. :-)

--Guido

Glyph Lefkowitz

unread,
Jan 23, 2014, 8:34:36 PM1/23/14
to Guido van Rossum, Eli Bendersky, python-tulip

On Jan 23, 2014, at 9:47 AM, Guido van Rossum <gu...@python.org> wrote:

So I think we should just note that
protocols are a lower-level concept than StreamReader. (And more
versatile -- that's usually how lower-level concepts work. :-)

+1.  Even within Twisted, which follows the callback idiom thoroughly, "Protocol" is a primitive, and you would want to build upon higher-level abstractions to actually make something practical.

-g

Victor Stinner

unread,
Jan 24, 2014, 11:35:48 AM1/24/14
to Glyph, Guido van Rossum, Eli Bendersky, python-tulip
2014/1/23 Glyph <gl...@twistedmatrix.com>:
> On Jan 22, 2014, at 7:33 AM, Guido van Rossum <gu...@python.org> wrote:
>> The solution is simple: write that logic as a separate method marked
>> with @coroutine, and fire it off in data_received() using async() (==
>> Task(), in this case). E.g.
>
> Wouldn't that process each subsequent TCP segment in a parallel coroutine,
> thereby with multiple segments of data stomping on each other if they're
> yielding during processing?

Hum, I tried to write something to explain that in asyncio documentation:

+Coroutines and protocols
+------------------------
+
+Coroutines can be scheduled in a protocol method using :func:`async`, but there
+is not guarantee on the execution order. Protocols are not aware of coroutines
+created in protocol methods and so will not wait for them.
+
+To have a reliable execution order, use :ref:`stream objects <streams>` in a
+coroutine with ``yield from``. For example, the :meth:`StreamWriter.drain`
+coroutine can be used to wait until the write buffer is flushed.

Victor
Reply all
Reply to author
Forward
0 new messages