Using a separate thread for each file sounds bad. Even select.select cannot be used on regular files - impossible to write a common "file reader" thread could handle multiple files. Using "cat file" with PipeIO sounds horrible, and it would work on Unix only.
Is there an appropiate option? At least something reasonable?
Is there an appropiate option? At least something reasonable?A threadpool is pretty reasonable for this (not a thread per file, but a small threadpool that you submit your reads to). In Go, for example, all file I/O is done in separate threads from application code.You can also just block while reading the file in chunks, like StaticFileHandler does: https://github.com/tornadoweb/tornado/blob/0721917e5078e0937d2cd5f8355aa9301df4ca06/tornado/web.py#L2535-L2547 This will generally be fine for local files, although if your files are on NFS or similar you probably need to do something different.
Thank you, this helped a lot. I'm going to read chunks in a separate thread and send them to the main ioloop with a queue.Queue.
import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
from tornado.queues import Queue
def read_file(file_path, queue: Queue, io_loop: IOLoop, chunk_size: int = 64 * 1024):
file_size = os.path.getsize(file_path)
remaining = file_size
fin = open(file_path, "rb")
lock = threading.Lock()
def putter(chunk, lock: threading.Lock):
queue.put(chunk) # Called from the loop's thread -> won't block the ioloop
lock.release() # Awake reader thread after the chunk has been put into the processing queue
def put(chunk, lock):
"""Put the chunk into the queue, and wait until it is processed by the ioloop"""
lock.acquire() # Acquire in this thread
io_loop.spawn_callback(putter, chunk, lock) # Release in the loop's thread
lock.acquire() # Wait until the loop's thread has accepted the chunk for processing
lock.release() # Cleanup before return
# Put the file size into the queue without waiting
io_loop.spawn_callback(queue.put, file_size)
while remaining > 0:
chunk = fin.read(min(chunk_size, remaining))
print("read", chunk)
remaining -= len(chunk)
time.sleep(1) # Just for testing: simulate slow file reads.
put(chunk, lock)
# Put EOF/terminator into the queue
io_loop.spawn_callback(queue.put, None)
pool = ThreadPoolExecutor(3)
async def main():
# Create a queue for sending chunks of data
cq = Queue(maxsize=3)
# Start the reader thread that reads in a separate thread
pool.submit(read_file, __file__, cq, io_loop, 100)
file_size = await cq.get()
print("file size:", file_size)
# Process chunks
while True:
item = await cq.get() # Queue access from the main loop
# Terminator -> EOF
if item is None:
break
print("got chunk:", repr(item))
io_loop.stop()
if __name__ == '__main__':
io_loop = IOLoop.current()
io_loop.run_sync(main)
io_loop.start()Instead, just use a concurrent.futures.ThreadPoolExecutor: `chunk = yield executor.submit(f.read, chunk_size)`.
Instead, just use a concurrent.futures.ThreadPoolExecutor: `chunk = yield executor.submit(f.read, chunk_size)`.By the way, is a Future returned by ThreadPoolExecutor.submit
compatible with tornado's ioloop?
Will the ThreadPool call set_result on that Future from within the ioloop's thread? Or is it not required? That is a bit foggy for me.
(The submitter will have to wait until the chunk is read, and then the reader won't start reading the next chunk until it is completely processed by the submitter. This is why I used an async queue of maxsize>1, so that f.read can be reading the next chunk while the current chunk is being processed inside the ioloop.)
--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
On Fri, Feb 10, 2017 at 8:40 AM László Nagy <nag...@gmail.com> wrote:Instead, just use a concurrent.futures.ThreadPoolExecutor: `chunk = yield executor.submit(f.read, chunk_size)`.By the way, is a Future returned by ThreadPoolExecutor.submitcompatible with tornado's ioloop?Tornado knows about the concurrent.futures package, and so it will wrap concurrent futures to make everything thread-safe. Unfortunately, asyncio and `async def` do not know about concurrent.futures, so you lose this convenience in native coroutines. (in an asyncio world you have to use `asyncio.get_event_loop().run_in_executor()`)
Will the ThreadPool call set_result on that Future from within the ioloop's thread? Or is it not required? That is a bit foggy for me.For concurrent.futures.Future, set_result is called from an unspecified thread. Tornado recognizes these futures when you yield them and adds a thread-safe wrapper.(The submitter will have to wait until the chunk is read, and then the reader won't start reading the next chunk until it is completely processed by the submitter. This is why I used an async queue of maxsize>1, so that f.read can be reading the next chunk while the current chunk is being processed inside the ioloop.)This is probably unnecessary because of the underlying socket's send buffer. The first call to write() is nearly instant because it just copies data into the send buffer. You don't start waiting for writes until the send buffer fills up.
In other words: I cannot blindly refactor @tornado.gen.coroutine into an async def method and change yield-s into an await-s.We have this in the documentation ( http://www.tornadoweb.org/en/stable/guide/coroutines.html#python-3-5-async-and-await ):"Python 3.5 introduces the async and await keywords (functions using these keywords are also called “native coroutines”). Starting in Tornado 4.3, you can use them in place of yield-based coroutines. Simply use async def foo() in place of a function definition with the @gen.coroutine decorator, and await in place of yield. The rest of this document still uses the yield style for compatibility with older versions of Python, but async and await will run faster when they are available"Well, that does not seem to be the whole truth. Refactoring coroutines to native coroutines requires semantic check: does that coroutine yield a future whose value is set from another thread? Does it use any other coroutine that does this? I see that as a major problem. Should we add this to the docs?
Yes, the docs need to be updated. But the distinction is not "futures whose value is set from another thread", it's anything that doesn't implement the __await__ special method. (the standard library could have added this method to concurrent.futures, but they didn't). So this applies to any non-standard yieldables that rely on the convert_yielded.register() method (although in many cases, such as twisted's Deferred, these objects are being updated to implement __yield__ as well).-Ben
""" In the future (pun intended) we may unify asyncio.Future and concurrent.futures.Future, e.g. by adding an __iter__() method to the latter that works with yield from. To prevent accidentally blocking the event loop by calling e.g. result() on a Future that's not done yet, the blocking operation may detect that an event loop is active in the current thread and raise an exception instead. However the current PEP strives to have no dependencies beyond Python 3.3, so changes to concurrent.futures.Future are off the table for now. """
(Actually, I asked this on the main Python list and got this anwer.)
We have also discovered an anomaly (incompatibility?) between AbstractIOLoop and tornado.ioloop.IOLOOP
asyncio names this "run_in_executor" https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.run_in_executor but tornado names it "run_on_executor" http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.run_on_executor
This seems to be the best solution, because it can be used with any ioloop implementation, and from both normal and native coroutines. But there is a difference (typo?) in the name. What a pity.
-Laszlo
asyncio names this "run_in_executor" https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.run_in_executorbut tornado names it "run_on_executor" http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.run_on_executorThis seems to be the best solution, because it can be used with any ioloop implementation, and from both normal and native coroutines. But there is a difference (typo?) in the name. What a pity.