How to create an iostream for a file?

357 views
Skip to first unread message

László Nagy

unread,
Feb 6, 2017, 7:19:09 AM2/6/17
to Tornado Web Server
tornado.iostream.IOStream is based on a socket, not a file-like object. I have tried PipeIOStream this way:


    with open("test_file.txt", "rb") as fin:
        stream = tornado.iostream.PipeIOStream(fin.fileno())

But I got this traceback:

  File "C:\Users\Laci\AppData\Local\Programs\Python\Python35\lib\site-packages\tornado\iostream.py", line 1470, in __init__
    _set_nonblocking(fd)
TypeError: 'NoneType' object is not callable

Well, this is a Windows system. It is said to be POSIX compatible, but apparently it is not really (there is no fnctl module). I guess there would be no problem with this on Unix, but on Windows it does not work.

The only stream class left is SSLIOStream but of course it cannot be used for an open file.

I could possibly use IOLoop.add_handler to implement my own async file reads. But I believe that "reading from a local file" is so basic stuff that it should not require writting my own polling.

How do I create a BaseIOStream compatible object for an open file object? Is there an easy way?


Explanation about why I need this: to create a method that accepts a file-like object, and make an asnyc HTTP PUT request that streams the file data to a remote server. The file should not be loaded to memory (possibly too big).



László Nagy

unread,
Feb 6, 2017, 3:10:40 PM2/6/17
to Tornado Web Server
I was wrong. ioloop.add_handler cannot be used for this. The select module could be used, but we already have an ioloop that is listening to file descriptiors... there should be a way for doing this, I just can't find it.

László Nagy

unread,
Feb 6, 2017, 3:27:10 PM2/6/17
to Tornado Web Server
Just went through this: https://groups.google.com/forum/#!topic/python-tornado/KZl-kJT-8zA

Wow, this is amazing! So even Linux does not support asnyc io read/write operations on file objects. Someone has suggested that it would be possible to use a separate thread to read from the file, and use add_callback to send the data back to the main thread.

Using a separate thread for each file sounds bad. Even select.select cannot be used on regular files - impossible to write a common "file reader" thread could handle multiple files. Using "cat file" with PipeIO sounds horrible, and it would work on Unix only.

Is there an appropiate option? At least something reasonable?

Ben Darnell

unread,
Feb 6, 2017, 6:42:02 PM2/6/17
to python-...@googlegroups.com
On Mon, Feb 6, 2017 at 3:27 PM László Nagy <nag...@gmail.com> wrote:
Using a separate thread for each file sounds bad. Even select.select cannot be used on regular files - impossible to write a common "file reader" thread could handle multiple files. Using "cat file" with PipeIO sounds horrible, and it would work on Unix only.

Is there an appropiate option? At least something reasonable?

A threadpool is pretty reasonable for this (not a thread per file, but a small threadpool that you submit your reads to). In Go, for example, all file I/O is done in separate threads from application code. 

You can also just block while reading the file in chunks, like StaticFileHandler does: https://github.com/tornadoweb/tornado/blob/0721917e5078e0937d2cd5f8355aa9301df4ca06/tornado/web.py#L2535-L2547 This will generally be fine for local files, although if your files are on NFS or similar you probably need to do something different.

-Ben
 

László Nagy

unread,
Feb 7, 2017, 3:02:04 AM2/7/17
to Tornado Web Server, b...@bendarnell.com
 
Is there an appropiate option? At least something reasonable?

A threadpool is pretty reasonable for this (not a thread per file, but a small threadpool that you submit your reads to). In Go, for example, all file I/O is done in separate threads from application code. 

You can also just block while reading the file in chunks, like StaticFileHandler does: https://github.com/tornadoweb/tornado/blob/0721917e5078e0937d2cd5f8355aa9301df4ca06/tornado/web.py#L2535-L2547 This will generally be fine for local files, although if your files are on NFS or similar you probably need to do something different.


Thank you, this helped a lot. I'm going to read chunks in a separate thread and send them to the main ioloop with a queue.Queue.

Ben Darnell

unread,
Feb 9, 2017, 7:11:18 PM2/9/17
to László Nagy, Tornado Web Server
On Tue, Feb 7, 2017 at 3:02 AM László Nagy <nag...@gmail.com> wrote:

Thank you, this helped a lot. I'm going to read chunks in a separate thread and send them to the main ioloop with a queue.Queue.

It's awkward to do this with a queue - you need to either use a `queue.Queue`, which doesn't have an event-driven interface for consuming in Tornado, or a `tornado.queues.Queue`, which is not thread-safe and requires awkward `add_callback` jumps. Instead, just use a concurrent.futures.ThreadPoolExecutor: `chunk = yield executor.submit(f.read, chunk_size)`. In native coroutines, this would be either `chunk = await asyncio.ensure_future(executor.submit(f.read, chunk_size))` or `chunk = await tornado.gen.convert_yielded(executor.submit(f.read, chunk_size))`.

-Ben 

Les

unread,
Feb 10, 2017, 3:21:13 AM2/10/17
to Ben Darnell, Tornado Web Server
Some thoughts:

- I could use queue.Queue, but that does a block when there is no data in the queue and you try to get an item. If you execute that code in the main thread, then that will block the server. Yes, I know that it is not likely that the file read operation is slower than the server. But it is very well possible that the user will try to read from NFS. Or it is possible that the disks are occupied with something else (for example: the admin is extracting a 10TB archive). At the same time, it is possible that the server has a 10GbE connection. I wanted to have a solution that works when the network is faster than the file I/O, and also the other way.
- About using futures: yes, it would work. But it forces the program to either read the chunk from from the file or process the chunk in the ioloop. It cannot do both. If the speed of the disk and the network is similar, then this will reduce the transfer speed to half of what would be possible.

Of course I cannot use tornado's async queue. It could be awaited form the main loop, but not from another thread. We need something else to synchronize. Here is something that I have imagined:

import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
from tornado.queues import Queue


def read_file(file_path, queue: Queue, io_loop: IOLoop, chunk_size: int = 64 * 1024):
    file_size = os.path.getsize(file_path)
    remaining = file_size
    fin = open(file_path, "rb")
    lock = threading.Lock()

    def putter(chunk, lock: threading.Lock):
        queue.put(chunk)        # Called from the loop's thread -> won't block the ioloop
        lock.release()          # Awake reader thread after the chunk has been put into the processing queue

    def put(chunk, lock):
        """Put the chunk into the queue, and wait until it is processed by the ioloop"""
        lock.acquire()  # Acquire in this thread
        io_loop.spawn_callback(putter, chunk, lock) # Release in the loop's thread
        lock.acquire()  # Wait until the loop's thread has accepted the chunk for processing
        lock.release()  # Cleanup before return

    # Put the file size into the queue without waiting
    io_loop.spawn_callback(queue.put, file_size)

    while remaining > 0:
        chunk = fin.read(min(chunk_size, remaining))
        print("read", chunk)
        remaining -= len(chunk)
        time.sleep(1)  # Just for testing: simulate slow file reads.
        put(chunk, lock)

    # Put EOF/terminator into the queue
    io_loop.spawn_callback(queue.put, None)


pool = ThreadPoolExecutor(3)


async def main():
    # Create a queue for sending chunks of data
    cq = Queue(maxsize=3)
    # Start the reader thread that reads in a separate thread
    pool.submit(read_file, __file__, cq, io_loop, 100)
    file_size = await cq.get()
    print("file size:", file_size)
    # Process chunks
    while True:
        item = await cq.get() # Queue access from the main loop
        # Terminator -> EOF
        if item is None:
            break
        print("got chunk:", repr(item))

    io_loop.stop()


if __name__ == '__main__':
    io_loop = IOLoop.current()
    io_loop.run_sync(main)
    io_loop.start()


The syncronization between the reader thread and the ioloop is done via two things:

* An async queue. It is accessed from within the ioloop only, there is no race condition there. Before we enter the ioloop and access the queue, the chunk of data is already in the memory, so it cannot block anything.
* Syncronization when the reader is faster: the ioloop can wait for more items in the queue, but it uses the async queue -> no blocking at all
* Syncronization when the processor is faster: it acquires a lock, and waits until the main thread releases the same lock (after it got the chunk of data). The main (loop) thread never acquires the lock, only releases it. And only when it is sure that it was already acquired. -> no blocking at all

The advantage would be that the file read operation and the chunk processing can happen in parallel in two threads. It is true that the GIL prevents our Python program to really run in paralell, but I hope that the file.read() operation ends deep down in the OS and while the OS is filling up the buffer, the GIL is released and it will be really possible for the main thread to process the chunk in parallel.

I might be totally wrong about this. :-)


László Nagy

unread,
Feb 10, 2017, 8:40:35 AM2/10/17
to Tornado Web Server, nag...@gmail.com, b...@bendarnell.com
 
Instead, just use a concurrent.futures.ThreadPoolExecutor: `chunk = yield executor.submit(f.read, chunk_size)`.

By the way, is a Future returned by ThreadPoolExecutor.submit compatible with tornado's ioloop?

Will the ThreadPool call set_result on that Future from within the ioloop's thread? Or is it not required? That is a bit foggy for me.

(The submitter will have to wait until the chunk is read, and then the reader won't start reading the next chunk until it is completely processed by the submitter. This is why I used an async queue of maxsize>1, so that f.read can be reading the next chunk while the current chunk is being processed inside the ioloop.)

Ben Darnell

unread,
Feb 19, 2017, 7:14:32 PM2/19/17
to python-...@googlegroups.com, nag...@gmail.com
On Fri, Feb 10, 2017 at 8:40 AM László Nagy <nag...@gmail.com> wrote:
 
Instead, just use a concurrent.futures.ThreadPoolExecutor: `chunk = yield executor.submit(f.read, chunk_size)`.

By the way, is a Future returned by ThreadPoolExecutor.submit 
compatible with tornado's ioloop?

Tornado knows about the concurrent.futures package, and so it will wrap concurrent futures to make everything thread-safe. Unfortunately, asyncio and `async def` do not know about concurrent.futures, so you lose this convenience in native coroutines. (in an asyncio world you have to use `asyncio.get_event_loop().run_in_executor()`)
 

Will the ThreadPool call set_result on that Future from within the ioloop's thread? Or is it not required? That is a bit foggy for me.

For concurrent.futures.Future, set_result is called from an unspecified thread. Tornado recognizes these futures when you yield them and adds a thread-safe wrapper.
 

(The submitter will have to wait until the chunk is read, and then the reader won't start reading the next chunk until it is completely processed by the submitter. This is why I used an async queue of maxsize>1, so that f.read can be reading the next chunk while the current chunk is being processed inside the ioloop.)

This is probably unnecessary because of the underlying socket's send buffer. The first call to write() is nearly instant because it just copies data into the send buffer. You don't start waiting for writes until the send buffer fills up. 

-Ben
 

--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

László Nagy

unread,
Feb 20, 2017, 6:29:39 AM2/20/17
to Tornado Web Server, nag...@gmail.com, b...@bendarnell.com


2017. február 20., hétfő 1:14:32 UTC+1 időpontban Ben Darnell a következőt írta:
On Fri, Feb 10, 2017 at 8:40 AM László Nagy <nag...@gmail.com> wrote:
 
Instead, just use a concurrent.futures.ThreadPoolExecutor: `chunk = yield executor.submit(f.read, chunk_size)`.

By the way, is a Future returned by ThreadPoolExecutor.submit 
compatible with tornado's ioloop?

Tornado knows about the concurrent.futures package, and so it will wrap concurrent futures to make everything thread-safe. Unfortunately, asyncio and `async def` do not know about concurrent.futures, so you lose this convenience in native coroutines. (in an asyncio world you have to use `asyncio.get_event_loop().run_in_executor()`)

In other words: I cannot blindly refactor @tornado.gen.coroutine into an async def method and change yield-s into an await-s. 


"Python 3.5 introduces the async and await keywords (functions using these keywords are also called “native coroutines”). Starting in Tornado 4.3, you can use them in place of yield-based coroutines. Simply use async def foo() in place of a function definition with the @gen.coroutine decorator, and await in place of yield. The rest of this document still uses the yield style for compatibility with older versions of Python, but async and await will run faster when they are available"

Well, that does not seem to be the whole truth. Refactoring coroutines to native coroutines requires semantic check: does that coroutine yield a future whose value is set from another thread? Does it use any other coroutine that does this?  I see that as a major problem. Should we add this to the docs?


 

Will the ThreadPool call set_result on that Future from within the ioloop's thread? Or is it not required? That is a bit foggy for me.

For concurrent.futures.Future, set_result is called from an unspecified thread. Tornado recognizes these futures when you yield them and adds a thread-safe wrapper.
 

(The submitter will have to wait until the chunk is read, and then the reader won't start reading the next chunk until it is completely processed by the submitter. This is why I used an async queue of maxsize>1, so that f.read can be reading the next chunk while the current chunk is being processed inside the ioloop.)

This is probably unnecessary because of the underlying socket's send buffer. The first call to write() is nearly instant because it just copies data into the send buffer. You don't start waiting for writes until the send buffer fills up. 

Oh,  I should have known. 

Thank you!

Ben Darnell

unread,
Feb 20, 2017, 2:35:31 PM2/20/17
to László Nagy, Tornado Web Server
On Mon, Feb 20, 2017 at 6:29 AM László Nagy <nag...@gmail.com> wrote:

In other words: I cannot blindly refactor @tornado.gen.coroutine into an async def method and change yield-s into an await-s. 


"Python 3.5 introduces the async and await keywords (functions using these keywords are also called “native coroutines”). Starting in Tornado 4.3, you can use them in place of yield-based coroutines. Simply use async def foo() in place of a function definition with the @gen.coroutine decorator, and await in place of yield. The rest of this document still uses the yield style for compatibility with older versions of Python, but async and await will run faster when they are available"

Well, that does not seem to be the whole truth. Refactoring coroutines to native coroutines requires semantic check: does that coroutine yield a future whose value is set from another thread? Does it use any other coroutine that does this?  I see that as a major problem. Should we add this to the docs?


Yes, the docs need to be updated. But the distinction is not "futures whose value is set from another thread", it's anything that doesn't implement the __await__ special method. (the standard library could have added this method to concurrent.futures, but they didn't). So this applies to any non-standard yieldables that rely on the convert_yielded.register() method (although in many cases, such as twisted's Deferred, these objects are being updated to implement __yield__ as well).

-Ben 

László Nagy

unread,
Feb 27, 2017, 2:26:12 AM2/27/17
to Tornado Web Server, nag...@gmail.com, b...@bendarnell.com


2017. február 20., hétfő 20:35:31 UTC+1 időpontban Ben Darnell a következőt írta:

Yes, the docs need to be updated. But the distinction is not "futures whose value is set from another thread", it's anything that doesn't implement the __await__ special method. (the standard library could have added this method to concurrent.futures, but they didn't). So this applies to any non-standard yieldables that rely on the convert_yielded.register() method (although in many cases, such as twisted's Deferred, these objects are being updated to implement __yield__ as well).

-Ben 

Thanks for your help!

Finally, I found out why the __await__ method was not implemented in concurrent.futures. 


"""
In the future (pun intended) we may unify asyncio.Future and
concurrent.futures.Future, e.g. by adding an __iter__() method to the
latter that works with yield from. To prevent accidentally blocking
the event loop by calling e.g. result() on a Future that's not done
yet, the blocking operation may detect that an event loop is active in
the current thread and raise an exception instead. However the current
PEP strives to have no dependencies beyond Python 3.3, so changes to
concurrent.futures.Future are off the table for now.
"""

(Actually, I asked this on the main Python list and got this anwer.)

We have also discovered an anomaly (incompatibility?) between AbstractIOLoop and tornado.ioloop.IOLOOP

asyncio names this "run_in_executor" https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.run_in_executor  
but tornado names it "run_on_executor" http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.run_on_executor

This seems to be the best solution, because it can be used with any ioloop implementation, and from both normal and native coroutines. But there is a difference (typo?) in the name. What a pity.

-Laszlo




 

László Nagy

unread,
Feb 27, 2017, 2:29:35 AM2/27/17
to Tornado Web Server, nag...@gmail.com, b...@bendarnell.com
 
asyncio names this "run_in_executor" https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.run_in_executor  
but tornado names it "run_on_executor" http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.run_on_executor

This seems to be the best solution, because it can be used with any ioloop implementation, and from both normal and native coroutines. But there is a difference (typo?) in the name. What a pity.

My wrong again. The later one is a decorator, and not the ioloop's method. So it is not an incompatibility that they have different names. 

Do we want to implement run_in_executor in tornado's ioloop? To make it more compatible with AbstractIOLoop. Would it be a good idea?
Reply all
Reply to author
Forward
0 new messages