Use Tornado IOLoop in a non-web context

401 views
Skip to first unread message

mrtn

unread,
Jun 15, 2015, 10:22:56 PM6/15/15
to python-...@googlegroups.com

I am not sure if this is possible/recommended, but here is the idea: I want to run a long-running process involving an IOLoop instance, and take advantage of the asynchronous capabilities of Tornado without running a complete web server (I don't need a web server in this case).

The long-running process simply waits for events, and upon an event, it processes the event and goes back to waiting, so the cycle repeats. The event may come from a message queue (Celery, Redis, etc), may come from stdin, etc.

The waiting on a particular event source may be blocking, and the process may wait for multiple such blocking 'sources'. In addition, some of the event processing may be blocking too, such as communicating with remote resources. Therefore, I want to utilize Tornado's asynchronous capabilities to deal with these various blocking parts, so that the long-running process can tackle interleaving events from different event sources efficiently.

I have some very loose idea about wrapping an IOLoop inside a class, and instantiate from this class to run the process:

class MyTornadoProcess(object):


   
def __init__(self):
       
self.io_loop = tornado.ioloop.IOLoop.current()
       
...


   
def run(self):
       
# Not sure what I do here
       
...


   
@tornado.gen.coroutine
   
def wait_for_source_a(self):
       
# Use coroutine/future or some async library to work around blocking wait
       
...
       
       
# Upon event, call processing_for_source_a()
       
self.processing_for_source_a(event)


   
@tornado.gen.coroutine
   
def wait_for_source_b(self):
       
# Use coroutine/future or some async library to work around blocking wait
       
...
       
       
# Upon event, call processing_for_source_b()
       
self.processing_for_source_b(event)


   
@tornado.gen.coroutine
   
def processing_for_source_a(self):
       
# Use some async library for a blocking task
       
...


   
@tornado.gen.coroutine
   
def processing_for_source_b(self):
       
# Use some async library for a blocking task
       
...


if __name__ == '__main__':
    proc
= MyTornadoProcess()
    proc
.run()


Is this feasible? I have seen Tornado being used for some one-off stuff, i.e. start the IOLoop, do some async stuff, stop the IOLoop and finish, but I have not seen Tornado being used for long-running process in a non-web-serving scenario. I'd imagine it is perfectly capable for such things, and it would be nice to have some examples/psuedo-code to illustrate the usage. In addition, my imagined way of structuring the code may not be optimal, so any critique is welcome.

A. Jesse Jiryu Davis

unread,
Jun 15, 2015, 10:42:10 PM6/15/15
to python-...@googlegroups.com
Once again, the web spider example is a good starting place:


It demonstrates using Tornado without serving web pages. You'll see there's no need to wrap all your coroutines into a class, just start all your coroutines and then start the loop.

--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornad...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

mrtn

unread,
Jun 15, 2015, 11:05:37 PM6/15/15
to python-...@googlegroups.com

Thanks. I actually looked at the spider example, which makes me think what I intend to do is possible, but one major difference is that the spider example uses io_loop.run_sync(main), which simply calls the 'main' function, and once the function finishes, it stops the IOLoop and finishes the program.

I want to keep the IOLoop running and responding to events, until something explicitly terminates the process. How do I do that?

Gavin M. Roy

unread,
Jun 15, 2015, 11:28:11 PM6/15/15
to python-...@googlegroups.com
It's probably too complex and specific for your use case, but check out https://github.com/gmr/rejected - it's a tornado based rabbitmq consumer framework. Take a look at process.py and consumer.py for the tornado and async bits.

A. Jesse Jiryu Davis

unread,
Jun 15, 2015, 11:31:52 PM6/15/15
to python-...@googlegroups.com
In order to run forever:

@gen.coroutine
def main():
    while True:
        # do stuff

mrtn

unread,
Jun 16, 2015, 12:10:05 AM6/16/15
to python-...@googlegroups.com

I see, so having io_loop.run_sync(main) together with:

@gen.coroutine
def main():
    while True:
        # do stuff

will effectively keeps the IOLoop running forever, even though run_sync() itself executes a function once and exits.

A related question: if a function is decorated with @gen.coroutine because inside that function there is 'yield' which makes the function a generator, does it mean that we must also yield this function when we call it? Or, we only need to do so when the generator returns a value? If the generator does not return a value, then we call the function directly without 'yield'. For example:

@gen.coroutine
def post_url():
    response = yield async_http_client.fetch('some url', method='POST', body=json.dumps({...}))
    if response.error:
        logger.error(...)

because post_url() does not return any value, we just call post_url() directly without using 'yield' in some other function?
        # do stuff
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe@googlegroups.com.

Ben Darnell

unread,
Jun 16, 2015, 9:42:49 AM6/16/15
to Tornado Mailing List
On Tue, Jun 16, 2015 at 12:10 AM, mrtn <mrtn...@gmail.com> wrote:

I see, so having io_loop.run_sync(main) together with:

@gen.coroutine
def main():
    while True:
        # do stuff

will effectively keeps the IOLoop running forever, even though run_sync() itself executes a function once and exits.

Yes, preventing a function from exiting is one way to make run_sync() last forever. But personally I prefer to just let run_sync() exit and immediately (re)start the IOLoop:

  IOLoop.current().run_sync(main)
  IOLoop.current().start()

The reason I like this pattern is for exception handling: any exception raised in main() will terminate the program; any exception raised after it has moved from run_sync() to start() will be logged but the program will keep going. An exception in a do-nothing while loop is of course very unlikely, but I find that the separate run_sync and start calls express the intention more clearly.
 

A related question: if a function is decorated with @gen.coroutine because inside that function there is 'yield' which makes the function a generator, does it mean that we must also yield this function when we call it? Or, we only need to do so when the generator returns a value? If the generator does not return a value, then we call the function directly without 'yield'. For example:

@gen.coroutine
def post_url():
    response = yield async_http_client.fetch('some url', method='POST', body=json.dumps({...}))
    if response.error:
        logger.error(...)

because post_url() does not return any value, we just call post_url() directly without using 'yield' in some other function?

You could call post_url without yield, but you generally shouldn't. If you don't use yield and post_url raises an exception, there is nowhere for that exception to go. (and stack_context magic may cause it to end up being raised somewhere you don't expect). If you really want to fire-and-forget a function call with no exception handling, use IOLoop.current().spawn_callback(post_url) to have the IOLoop call the function for you.

-Ben

mrtn

unread,
Jun 16, 2015, 6:07:25 PM6/16/15
to python-...@googlegroups.com, b...@bendarnell.com

Thanks Ben. I see the reasoning for using 'yield' vs. not using it, but I don't quite get your first point about keep run_sync(main) running forever.

  IOLoop.current().run_sync(main)
  IOLoop.current().start()

It is true that the above will restart the IOLoop immediately once run_sync (and main()) is finished, but the program will be simply stuck in an IOLoop, so how do I run main() again and keep it running forever?

On the other hand, I've tried the following using Redis as an example:

from datetime import datetime
import urllib.parse
import simplejson as json
from tornado import gen, ioloop, httpclient
from toredis import Client


httpclient
.AsyncHTTPClient.configure('tornado.curl_httpclient.CurlAsyncHTTPClient')
http_client
= httpclient.AsyncHTTPClient()


rds
= Client()


def log(message):
   
print("{0}: {1}".format(datetime.now().strftime('%H:%M:%S.%f')[:-3], message))


@gen.coroutine
def connect_redis(socket_path='/tmp/redis.sock', pwd='123456'):
   
yield gen.Task(rds.connect_usocket, socket_path)
    status
= yield gen.Task(rds.auth, pwd)
   
if status == 'OK':
        log
('Redis authenticated')
        status
= yield gen.Task(rds.select, '0')
       
if status == 'OK':
            log
('Redis DB 0 selected')


@gen.coroutine
def fetch_task():
    log
('Waiting for new task ...')
    key
, task = yield gen.Task(rds.brpop, 'queue:test', timeout=0)
    log
('Received a new task: {}'.format(task))


    task_dict
= json.loads(task)
    task_id
= task_dict['ID']


    log
('Processing task #{}'.format(task_id))
    res
= yield process_task(task_dict)
    log
('Received response: {}'.format(res))
    log
('Finished task #{}'.format(task_id))


@gen.coroutine
def process_task(task):
   
# Up to 3 tries
   
for _ in range(3):
       
try:
            response
= yield http_client.fetch('http://httpbin.org/post', method='POST',
                                               headers
={'Content-Type': 'application/x-www-form-urlencoded'},
                                               body
=urllib.parse.urlencode({'Text': task['Body']}))
           
yield gen.sleep(5)
            log
('Request POSTed')
           
return response.body
       
except httpclient.HTTPError as e:
            log
('Error when POSTing: %s', e.args[0])
           
continue


@gen.coroutine
def main():
   
yield connect_redis()
   
while True:
       
yield fetch_task()


if __name__ == '__main__':
    io_loop
= ioloop.IOLoop.current()
    io_loop
.run_sync(main)


Here I use toredis, an asynchronous redis client, to listen to a queue (blocking), and process task from this queue (blocking too). I was hoping that while the program is waiting for processing a task, it can continue to retrieve new task from the queue and to start processing the new task too.

However, after pushing in two tasks almost simultaneously, this is what I get in stdout:

15:06:09.408: Waiting for new task ...
15:06:16.077: Received a new task: b'{"ID": 1, "Body": "Hello!"}'
15:06:16.077: Processing task #1
15:06:21.568: Request POSTed
15:06:21.568: Received response: b'{\n  "args": {}, \n  "data": "", \n  "files": {}, \n  "form": {\n    "Text": "Hello!"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip,deflate", \n    "Content-Length": "13", \n    "Content-Type": "application/x-www-form-urlencoded", \n    "Host": "httpbin.org", \n    "User-Agent": "Mozilla/5.0 (compatible; pycurl)"\n  }, \n  "json": null, \n  "origin": "205.154.255.224", \n  "url": "http://httpbin.org/post"\n}\n'
15:06:21.569: Finished task #1
15:06:21.569: Waiting for new task ...
15:06:21.569: Received a new task: b'{"ID": 2, "Body": "World!"}'
15:06:21.569: Processing task #2
15:06:26.784: Request POSTed
15:06:26.784: Received response: b'{\n  "args": {}, \n  "data": "", \n  "files": {}, \n  "form": {\n    "Text": "World!"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip,deflate", \n    "Content-Length": "13", \n    "Content-Type": "application/x-www-form-urlencoded", \n    "Host": "httpbin.org", \n    "User-Agent": "Mozilla/5.0 (compatible; pycurl)"\n  }, \n  "json": null, \n  "origin": "205.154.255.224", \n  "url": "http://httpbin.org/post"\n}\n'
15:06:26.784: Finished task #2
15:06:26.784: Waiting for new task ...

In other words, the tasks are simply retrieved and processed in sequence, and there is no asynchronous operation at all.

I'd appreciate you could point out what I've done wrong here. Thanks.

        # do stuff

To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Tornado Web Server" group.
To unsubscribe from this group and stop receiving emails from it, send an email to python-tornado+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Ben Darnell

unread,
Jun 16, 2015, 10:24:33 PM6/16/15
to Tornado Mailing List
On Tue, Jun 16, 2015 at 6:07 PM, mrtn <mrtn...@gmail.com> wrote:

Thanks Ben. I see the reasoning for using 'yield' vs. not using it, but I don't quite get your first point about keep run_sync(main) running forever.

  IOLoop.current().run_sync(main)
  IOLoop.current().start()

It is true that the above will restart the IOLoop immediately once run_sync (and main()) is finished, but the program will be simply stuck in an IOLoop, so how do I run main() again and keep it running forever?

Do one of the two: either put a `while True` loop in `main()`, or do the `run_sync(); start()` sequence, but don't do both.

Here's one way of structuring your example:

queue = tornado.queues.Queue()

@gen.coroutine
def redis_reader():
    while True:
        key, task = yield gen.Task(rds.brpop, 'queue:test', timeout=0)
        yield queue.put(json.loads(task))

@gen.coroutine
def worker():
    while True:
        task = yield queue.get()
        yield process_task(task)

@gen.coroutine
def main():
    yield connect_redis()
    IOLoop.current().spawn_callback(redis_reader)
    for i in range(num_workers):
        IOLoop.current().spawn_callback(worker)

if __name__ == '__main__':
    IOLoop.current().run_sync(main)
    IOLoop.current().start()
You only have one loop and it's waiting for one task to finish before starting the next one. If you structure it as I did above you'll see things happening in parallel because there are multiple worker tasks. (there are other ways to accomplish the same thing, such as by using a semaphore and spawning single-task callbacks instead of a queue with long-lived callbacks). 

-Ben
Reply all
Reply to author
Forward
0 new messages