Incomplete reads from PipeIOStream

37 views
Skip to first unread message

Josiah Ulfers

unread,
Dec 8, 2022, 5:18:39 PM12/8/22
to python-...@googlegroups.com
When I read from PipeIOStream, I sometimes get incomplete reads. There's a
thread writing the data and a Tornado coroutine reading chunks in a loop.
The data read from PipeIOStream can be incomplete on Tornado 4 or Tornado
5 with Python 2, which I realize is no longer supported, but I hope
someone can tell me if I'm making an obvious error anyway?

The workaround I've constructed is:

class CompleteReadingPipeIOStream(PipeIOStream):

def write(self):
raise NotImplementedError('Writes ignoring EPOLLHUP may be
unsafe')

def _handle_events(self, fd, events):
super(CompleteReadingPipeIOStream, self)._handle_events(fd,
# Ignore EPOLLHUP because super treats it as an error and
marks
# this side of the pipe closed when actually it just means
that the
# other side of the pipe closed. This side might still
need to read
# data, so closing on hup can result in incomplete reads.
It's safe
# to ignore at the reading side because reading to the end
of the
# stream raises ``StreamClosedError`` regardless.
#
# It may *not* be safe to ignore at the writing side.
events & ~select.EPOLLHUP)

I'm not certain whether ignoring hup on the writing side is safe or not,
but I ban them to be cautious.

The example that follows demonstrates the problem, including a small
overload ViewEventsPipeIOStream to show what I suspect is the cause:
EPOLLHUP arrives and PipeIOStream marks itself closed before the reads
finished. Replacing ViewEventsPipeIOStream with
CompleteReadingPipeIOStream makes it run cleanly.

#!/usr/bin/env python
from io import open

import os
import threading

import tornado.gen
from tornado.ioloop import IOLoop
from tornado.iostream import PipeIOStream

@tornado.gen.coroutine
def read(from_pipe):
data = b''
while True:
try:
data += yield from_pipe.read_bytes(1000, partial=True)
yield tornado.gen.moment # eg flush to the network
except tornado.iostream.StreamClosedError:
break
raise tornado.gen.Return(data)

def pipe(chunks):
from_fd, to_fd = os.pipe()
wrote = bytearray()
def write():
with open(to_fd, 'wb') as w:
for chunk in chunks:
wrote.extend(chunk)
w.write(chunk)
threading.Thread(target=write).start()
return wrote, ViewEventsPipeIOStream(from_fd)

def chunks():
for _ in range(100):
yield b'.' * 1000

@tornado.gen.coroutine
def main():
wrote, from_pipe = pipe(chunks())
out = yield read(from_pipe)
assert wrote == out # <- sometimes fails
print('ok', len(wrote))

class ViewEventsPipeIOStream(PipeIOStream):

def _handle_events(self, fd, events):
print('events', bin(events))
super(ViewEventsPipeIOStream, self)._handle_events(fd, events)

for _ in range(100):
# Usually abends with output like this:
#
# ('events', '0b1')
# ('events', '0b10000')
# Traceback (most recent call last):
# File "pipe_err.py", line 69, in <module>
# IOLoop.current().run_sync(main)
# File ".../tornado/ioloop.py", line 576, in run_sync
# return future_cell[0].result()
# File ".../tornado/concurrent.py", line 261, in result
# raise_exc_info(self._exc_info)
# File ".../tornado/gen.py", line 1147, in run
# yielded = self.gen.send(value)
# File "pipe_err.py", line 44, in main
# assert wrote == out
# AssertionError
#
# 0b1 is EPOLLIN, data available for read
# 0b10000 is EPOLLHUP, other side of the pipe closed
IOLoop.current().run_sync(main)
all_events = []

This reproduces the error on 3 of 4 Linux machines that I've tried, most
of the time.

Josiah Ulfers

unread,
Dec 8, 2022, 5:39:03 PM12/8/22
to python-...@googlegroups.com

Sorry for Google Groups formatting. Trying again as html.

Ben Darnell

unread,
Dec 9, 2022, 11:45:31 AM12/9/22
to python-...@googlegroups.com
I'm not certain it's the same thing, but Tornado 6.1 fixed some bugs related to reading from IOStreams which the other side closed after sending data (https://github.com/tornadoweb/tornado/pull/2805). This was with sockets and not pipes, so there's a chance that pipes have their own issue, but first I'd try upgrading and see if that fixes the problem.

-Ben

--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/python-tornado/ecd62bfd80922512a36aecb7fa68ed1e%40mail.gmail.com.

Josiah Ulfers

unread,
Dec 12, 2022, 9:28:38 AM12/12/22
to python-...@googlegroups.com

Thank you, Python 3 does resolve this pipe problem, but I suspect that pull request is unrelated. Running the example below on Python 3 with Tornado 5 does not show the problem (and neither does the equivalent translation for Tornado 6): If I read the Tornado code correctly, I think the differences is that Tornado 5 uses the asyncio event loop when available and BaseAsyncIOLoop.add_handler() ignores IOLoop.ERROR. So updating to Python 3 does fix the problem (and begs the question of what happens to epoll error events in asyncio, which I haven’t looked into).

Josiah Ulfers

unread,
Dec 15, 2022, 2:00:26 PM12/15/22
to python-...@googlegroups.com

For anyone else still migrating apps off Python 2, the workaround I posted earlier in this thread is incomplete and potentially causes the reading side of PipeIOStream on Python 2 to hang. To correct this, translate EPOLLHUP to EPOLLIN instead of ignoring it:

 

class CompleteReadPipeIOStream(PipeIOStream):

  def get_fd_error(self):
    # The default implementation returns None, which makes errors undetectable
    return IOError('Unknown error in pipe')
 
  def write(self, *args, **kwargs):
    raise NotImplementedError('This implementation untested for writes')
 
  def _handle_events(self, fd, events):
    if events & select.EPOLLHUP:
      # Do not pass along EPOLLHUP because super treats it as an error
      # and marks this side of the pipe closed when actually it just
      # means that the write side of the pipe closed. This side might
      # still need to read data, so closing on hup can result in
      # incomplete reads.
      #
      # Translate to read because there's a window between when the
      # writer writes the last data and when it writes EOF during which
      # super might try a read. That read returns None, meaning EAGAIN
      # or EWOULDBLOCK, so super waits for an EPOLLIN message before it
      # will satisfy the next read future, but the EPOLLIN never arrives
      # because closing the writer sets EPOLLHUP instead.
      events = (events | select.EPOLLIN) & ~select.EPOLLHUP
    super(CompleteReadPipeIOStream, self)._handle_events(fd, events)

Switching to Python 3 with Tornado 5+, using asyncio, that is, obviates this. Because asyncio uses the selectors module, it translates hup to read or write already - from the discussion starting on https://github.com/python/cpython/issues/61057#issuecomment-1093604398

 

there's no SELECT_ERR anymore: error conditions (POLLNVAL|POLLERR and also POLLHUP) are reported as SELECT_IN|SELECT_OUT, depending on the input event mask: I don't think that a separate error flag is needed, because the error will be reported when the FD will be read/written (and in the context where the error can be handled appropriately

 

 

From: Josiah Ulfers <josiah...@nltgis.com>
Sent: Monday, December 12, 2022 9:27 AM
To: 'python-...@googlegroups.com' <python-...@googlegroups.com>
Subject: RE: [tornado] RE: Incomplete reads from PipeIOStream

 

Thank you, Python 3 does resolve this pipe problem, but I suspect that pull request is unrelated. Running the example below on Python 3 with Tornado 5 does not show the problem (and neither does the equivalent translation for Tornado 6): If I read the Tornado code correctly, I think the differences is that Tornado 5 uses the asyncio event loop when available and BaseAsyncIOLoop.add_handler() ignores IOLoop.ERROR. So updating to Python 3 does fix the problem (and begs the question of what happens to epoll error events in asyncio, which I haven’t looked into).

 

From: python-...@googlegroups.com <python-...@googlegroups.com> On Behalf Of Ben Darnell
Sent: Friday, December 9, 2022 11:45 AM
To: python-...@googlegroups.com
Subject: Re: [tornado] RE: Incomplete reads from PipeIOStream

 

I'm not certain it's the same thing, but Tornado 6.1 fixed some bugs related to reading from IOStreams which the other side closed after sending data (https://github.com/tornadoweb/tornado/pull/2805). This was with sockets and not pipes, so there's a chance that pipes have their own issue, but first I'd try upgrading and see if that fixes the problem.

 

-Ben

Reply all
Reply to author
Forward
0 new messages