Pattern for recurring events in coroutines?

58 views
Skip to first unread message

Robie Basak

unread,
Dec 29, 2015, 8:31:07 PM12/29/15
to python...@googlegroups.com
Hi,

Futures are really nice for one-off events, in that I can just
arbitrarily "yield from" or "await" a future in the middle of a
coroutine.

Is there an pattern to do something similar for recurring similar
events?

The closest pattern of which I'm aware is a class with callbacks for
each type of event (as used for example in a Transport or Protocol). And
another pattern is a register callback mechanism. But both seem to lead
to some convulated code in some cases for me, with the need to store
state in class instance attributes whereas in a coroutine the code path
could carry the necessary state instead.

So what can I do to get direct access to a series of events in turn from
the middle of a coroutine that I can just "await"?

I ended up implementing something to do this that should hopefully
explain what I mean. My question is: is there a better way, or if not,
what improvements could I make? There are some hacks in my abstractions
that I don't particularly like.

Let me start with use, then I'll show implementation. I would like to
get an iterator that produces futures which complete in turn, one for
each event:

@asyncio.coroutine
def foo():
for future in get_something_happened_events():
event = yield from future
# handle the event

The reason I want this is because then I can handle logic for multiple
events at once, too:

@asyncio.coroutine
def bar():
a_type_events = iter(...)
b_type_events = iter(...)

next_a_event = next(a_type_events)
next_b_event = next(b_type_events)

asyncio.wait([next_a_event, next_b_event], timeout=...)
if next_a_event.done():
...
next_a_event = next(a_type_events)

I hope this summary explains why this is useful to me, but if not I have
a concrete example further below.

Let me go deeper.

My implementation of this is:

class _FutureLinkedListItem:
def __init__(self):
self.future = asyncio.Future()
self.next = None

class EventQueue:
def __init__(self):
self._next_future_item = _FutureLinkedListItem()

def __iter__(self):
return _EventQueueReader(self)

def _get_next_future_item(self, item):
if item.next is None:
item.next = _FutureLinkedListItem()
return item.next

def _move_along(self):
self._next_future_item = self._get_next_future_item(self._next_future_item)

def send_result(self, result):
self._next_future_item.future.set_result(result)
self._move_along()

def send_exception(self, exception):
self._next_future_item.future.set_exception(exception)
self._move_along()

class _EventQueueReader:
def __init__(self, parent):
self.parent = parent
self._next_future_item = parent._next_future_item

def _move_along(self):
self._next_future_item = self.parent._get_next_future_item(self._next_future_item)

def __iter__(self):
return self

def __next__(self):
future = self._next_future_item.future
self._move_along()
return future


Now I can just create an EventQueue as a class attribute. Anything can
send events into that queue using send_result() or send_exception().
Anything can listen for events by iter() on the EventQueue instance
returning my series of Futures. Multiple listeners see the same set of
Futures. Old Futures no longer referenced, even "wasted" ones, just get
garbage collected. Listeners can ask for Futures for the next ten events
at once, and they will be instantiated on the spot and will eventually
fire. But in the common case only one non-done Future exists at one time
for the next event.

I found that I needed to create long running "event processing"
coroutines that listen for events and generate others. So I wrote a
simple Worker class to track long lived coroutines, cancelling them if
garbage collected.

I've put all the code in
https://gist.github.com/basak/c0a8f4f5a51d7fce1cc7, including the Worker
implementation, etc.

I have two concrete cases where I think this came in handy, which is why
I'm seeking best practice on this. My second concrete example is a
prototype XMPP client I'm using where there are various coroutines
running during protocol negotation etc, but I'll save that for another
time. First is a case where I need to listen on system power events as
well as time out on them.

On my Ubuntu phone, when the system is asleep time doesn't pass as far
as poll(2) etc are concerned, so asyncio.sleep() isn't sufficient for a
wall clock based time interval wait. For example if the phone is on
battery and is not being used, a sleep of 60 seconds may not complete
for hours as the phone wakes only occasinoally and for only a few
milliseconds at a time. So I have to augment a sleep by listening for
system power events so that after a wakeup I can check the RTC to see if
more time has passed because the system was asleep.

First, here's the class that watches power state. There's a program that
will output state changes to stdout, so I just pick up on that and
publish events to a public EventQueue attribute. This uses a coroutine
that will get cancelled by the Worker if the class instance is garbage
collected:


class PowerStateWatcher:
def __init__(self, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.power_events = util.EventQueue()

self._worker = util.Worker(loop=loop)
self._worker.create_task(self.watch_wakeups())

@asyncio.coroutine
def watch_wakeups(self):
create = asyncio.create_subprocess_exec('stdbuf', '-o0', 'powerd-cli', 'listen', stdout=subprocess.PIPE)
proc = yield from create

try:
while True:
line = yield from proc.stdout.readline()
if line == b"Received SysPowerStateChange: state=0\n":
self.power_events.send_result(0)
elif line == b"Received SysPowerStateChange: state=1\n":
self.power_events.send_result(1)
finally:
proc.kill()


Next, a class that creates events on a regular pulse that others can
listen on by watching the EventQueue public instance attribute in the
same way:

class Clock:
def __init__(self, period, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.trigger_events = util.EventQueue()

self._worker = util.Worker(loop=loop)
self._worker.create_task(self._watch(period))

@asyncio.coroutine
def _watch(self, period):
watcher = PowerStateWatcher(self.loop)
power_events = iter(watcher.power_events)
future_power_event = next(power_events)

deadline = time.time() + period
while True:
time_remaining = deadline - time.time()
if time_remaining < 0:
self.trigger_events.send_result(-time_remaining)
deadline = time.time() + period
else:
yield from asyncio.wait(
[future_power_event],
timeout=time_remaining,
loop=self.loop,
)
if future_power_event.done():
future_power_event = next(power_events)


Finally, I can just use this from a coroutine as follows:

@asyncio.coroutine
def create_periodic_reports(loop):
clock = Clock(period=300, loop=loop)
for future_trigger in clock.trigger_events:
tardiness = yield from future_trigger
# DO SOMETHING HERE


Before I did it this way, I had a pretty convoluted class with a method
that got called on a power state change event that then had to examine
state stored in the class manipulated by other methods. Moving the logic
into a single coroutine seemed to make things simpler to me.

I have just noticed that I have classes each with two methods, one of
which is __init__. So I could turn them into plain coroutines now, which
is nice. I refactored extensively to get to where I am now so that's how
I ended up here. I won't try and do it while drafting this email though
as I don't have an easy way to test it will work.

Thank you for reading this far! So what do you think? Am I insane, or is
this a sensible way to arrange things?

Guido van Rossum

unread,
Jan 4, 2016, 12:01:59 AM1/4/16
to Robie Basak, python-tulip
Hi Robie,

This all looks very well put together; I don't see any obvious shortcuts in your code.

I was at first tempted to recommend using the asyncio.Queue class, but it seems you really want to be able to have multiple consumers of the same stream of events. I notice that your Event* classes don't contain any coroutines, but you can't use a collections.deque because you want it to be an infinite iterator. I guess a consumer that doesn't wait for the Futures would be in trouble, but you seem to be okay with that.

If you are using Python 3.5 you might try changing things around to use PEP 492's `async for`; this should let you write

    async for result in <queue>:
        ...use result...

rather than

    for future in <queue>:
        result = yield from future  # or await future
        ...use result...

but it's not as easy to recover when a future has an exception instead of a result, and still continue the loop, which also seems to be one of your requirements.

So I think you've arrived at a very reasonable way to factor your code given all your constraints.

One last idea: Perhaps you could try to rethink the API in a completely different way, not using infinite iterators? I was thinking of using an asyncio.Queue instead of the iterator, where you can block on its .get() method whenever you want to wait for the next event (this is an awaitable, so you can combine waiting for multiple events, timeouts etc.). The ability to have multiple consumers would then have to be supported directly by the producer, e.g. using a simple registration interface, which would take the role of EventQueue.__iter__() in your current design. The producer could maintain a weakref.WeakSet of queues connecting it to consumers, so a consumer that disappears (or drops its queue) simply disappears from the set.

The one feature this design doesn't support is passing exceptions through the queue -- you'd have to invent some kludge using a different type of queue value that the consumer must check for if you need this. (Though your examples don't seem to use send_exception(), so maybe it's a case of over-generalization? I didn't read your more elaborate example though.)

Whatever you decide, good  luck with the further development of this code!

--Guido


--
--Guido van Rossum (python.org/~guido)
Reply all
Reply to author
Forward
0 new messages