Multithreading in pubsub

2,282 views
Skip to first unread message

Oliver

unread,
May 25, 2009, 2:02:45 PM5/25/09
to pypubsub_dev
I'm more and more leaning towards the approach that multithreading
would be best left out of the library but provide mechanisms to bridge
with multithreading. Here is what I do instead:

First I should say that I have not used Qt yet (looking forward to an
opportunity!) but I would assume you have same constraints as other
windowing API: all your GUI event handling must occur in the main
thread, so any computations, results or data acquired in auxiliary
threads must be communicated back to the main thread, which can then
update the relevant parts of the GUI.

Without pubusb, you would typically have your auxiliary
"listeners" (functions or methods "running" in separate thread)
monitor some sort of queue for data to process (computation) or
command to execute (network or DB access, etc). They do their stuff,
then they put the "results" on another queue, which your main thread
monitors and knows how to interpret to update the GUI. Your
application also typically needs to be able to handle the "results" in
different order from the queries/commands issue.

How does PyPubSub fit into this? What pypusub makes easy is the
concept of generating data in one part of an application, and not
having to worry about what other parts, if any, will be receiving the
data, and what will be done with it. So there are two situations:

1. you have data in your main thread, and you want to publish it, with
some listeners being in auxiliary threads;
2. you may have data generated by the auxiliary threads, and they want
to publish the results, and some of the listeners using the results
must be in the main thread.

The first situation is easy: your listener is a method of your thread
object, and puts the data in a queue specific to your thread object.
So no need to modify pubsub there. Ie in pseudocode:

class Task(Thread):

def run(self): // will run in separate thread
loop until thread stopped:
do stuff
if data in my queue:
get data out of queue so next loop uses it

def listener(self, data):
put data on queue (or flag "stop")

pub.subscribe( yourTask.listener, topicName )

The second situation is not much more complicated but there are a
couple variants. Easiest is if the auxiliary thread can be told where
to put the thread's result, and let the main thread publish them,
thereby resolving the second issue. The easiest way to do this (in
Python at least) is that the caller that triggers the thread to do
something gives it a queue to put the results in, then polls the queue
for result, then publishes:

class App: // runs in main thread
def onGuiEvent(self):
thread.process(data, my queue)

def onGuiIdle(self):
if data in my queue:
extract data
pub.sendMessage(data)

So again, very simple, and not much reason for pubsub to intervene:
all your thread needs is an external queue, and all your main needs is
some way of polling that queue and take action based on new queue
content.

Second variant would be when the thread can't be given a place to put
result (such as if it is itself a listener via pubsub, in which case
it would lead to a bit of a strange design, though it would be fine in
some cases). But not much more is needed than a function you register
to be called when GUI idle:

class Task(Thread):

def run(self): // will run in separate thread
loop until thread stopped:
do stuff
put new data on my out-queue

def monitor(self):
if new data on my out-queue:
pub.sendMessage(data)

guiAPI.onIdle(yourTask.monitor)

The guiAPI is Qt, wxPython, etc.

The above can be combined with the first bit of code easily for the
case where the thread is both a listener and a publisher.

oliver

unread,
Jun 1, 2009, 1:07:11 AM6/1/09
to pypubsub_dev
One user asked why not use Qt or wxPython mechanisms to help with data
transfer between aux thread and main thread. Introducing external
dependencies into pypubsub is really not desirable. Especially not on
Qt or wxPython, which are large packages that not everyone would
appreciate being tied to. Also, as mentioned in original post, the
inter-thread communication is easily taken care of outside of pypubsub
so it would unnecessarily complicate the API if it was put in there.

For GUI applications, I think you don't have much choice but to poll:
the main thread is in an event loop, so it must not block waiting for
data to arrive, it has no choice but to check if there is data and if
not, move on. AFAIK, there is no builtin mechanism in Python for
triggering a function call in another thread. Ie no main thread
function can be automatically called (from main thread) when new data
arrives from aux thread; so main thread must check whether new data is
available. If you can think of a builtin mechanism let me know.

Note that you should make use of your GUI API to help with
synchronization. For instance, in wxPython you would use wx.CallAfter
(pub.sendMessage, ...) so that sendMessage will end up called from
main thread. In a Qt application, you should use qtsignals. In
wxPython, the code gets simplified significantly since you don't need
to register a callback (wx.CallAfter does it for you) or use a
synchronized queue. In QtSignals you still have to register a signal
listener (in the example shown in next post, corresponds to the
onIdle) in which the pubsub is called, but you no longer need a
synchronized queue (as qtsignal takes care of that).

I'll post the example next.

oliver

unread,
Jun 1, 2009, 1:11:46 AM6/1/09
to pypubsub_dev
'''
This test gives an example of how some computation results from an
auxiliary thread could be 'published' via pubsub in a thread-safe
manner, in a 'gui'-like application, ie an application where the
main thread is in an infinite event loop and supports the callback
of user-defined functions when the gui is idle.

The worker thread 'work' is to increment a counter
as fast as interpreter can handle. Every so often (every resultStep
counts),
the thread stores the count in a synchronized queue, for later
retrieval
by the main thread. In parallel to this, the main thread loops forever
(or
until user interrupts via keyboard), doing some hypothetical work
(represented by the sleep(1) call) and calling all registered 'idle'
callbacks. The transfer is done by extracting items from the queue and
publishing them via pubsub.

Example output (ctrl-Break just after 9th transfer; value of
resultStep has
a big impact on the output):

> c:\python24\python multithreadloop.py
starting event loop
aux thread started
1 <_MainThread(MainThread, started)> 1
2 <_MainThread(MainThread, started)> 2
3 <_MainThread(MainThread, started)> 3
3 <_MainThread(MainThread, started)> 4
4 <_MainThread(MainThread, started)> 5
5 <_MainThread(MainThread, started)> 6
6 <_MainThread(MainThread, started)> 7
6 <_MainThread(MainThread, started)> 8
7 <_MainThread(MainThread, started)> 9
8 <_MainThread(MainThread, started)> 10
8 <_MainThread(MainThread, started)> 11
9 <_MainThread(MainThread, started)> 12
Main interrupted, stopping aux thread
aux thread done

Oliver Schoenborn
May 2009
'''

__author__="schoenb"
__date__ ="$31-May-2009 9:11:41 PM$"

from Queue import Queue
import time
import threading

from pubsub import pub


resultStep = 1000000 # how many counts for thread "result" to be
available


def threadObserver(transfers, threadObj, count):
'''Listener that listens for data from testTopic. This function
doesn't know where the data comes from (or in what thread it was
generated... but threadObj is the thread in which this
threadObserver is called and should indicate Main thread).'''

print transfers, threadObj, count / resultStep

pub.subscribe(threadObserver, 'testTopic')


def onIdle():
'''This should be registered with 'gui' to be called when gui is
idle
so we get a chance to transfer data from aux thread without
blocking
the gui. Ie this function must spend as little time as possible so
'gui' remains reponsive.'''
thread.transferData()


class ParaFunction(threading.Thread):
'''
Represent a function running in a parallel thread. The thread
just increments a counter and puts the counter value on a
synchronized
queue every resultStep counts. The content of the queue can be
published by
calling transferData().
'''

def __init__(self):
threading.Thread.__init__(self)
self.running = False # set to True when thread should stop
self.count = 0 # our workload: keep counting!
self.queue = Queue() # to transfer data to main thread
self.transfer = 0 # count how many transfers occurred

def run(self):
print 'aux thread started'
self.running = True
while self.running:
self.count += 1
if self.count % resultStep == 0:
self.queue.put(self.count)

print 'aux thread done'

def stop(self):
self.running = False

def transferData(self):
'''Send data from aux thread to main thread. The data was put
in
self.queue by the aux thread, and this queue is a Queue.Queue
which
is a synchronized queue for inter-thread communication.
Note: This method must be called from main thread.'''
self.transfer += 1
while not self.queue.empty():
pub.sendMessage('testTopic',
transfers = self.transfer,
threadObj = threading.currentThread(),
count = self.queue.get())


thread = ParaFunction()


def main():
idleFns = [] # list of functions to call when 'gui' idle
idleFns.append( onIdle )

try:
thread.start()

print 'starting event loop'
eventLoop = True
while eventLoop:
time.sleep(1) # pretend that main thread does other stuff
for idleFn in idleFns:
idleFn()

except KeyboardInterrupt:
print 'Main interrupted, stopping aux thread'
thread.stop()

except Exception, exc:
print exc
print 'Exception, stopping aux thread'
thread.stop()


main()

oliver

unread,
Jun 1, 2009, 1:18:45 AM6/1/09
to pypubsub_dev
With QtSignals you would replace the code that puts data on the queue
with code that generates a signal, the transferData() would no longer
be needed and the onIdle could be registered as a signal handler
instead of registered as a GUI idle callback.

GeertVc

unread,
Jun 1, 2009, 1:38:41 AM6/1/09
to pypubsub_dev, Geert Vancompernolle
Hi Oliver,

Finally, I found some time to give you feedback on your previous
email. I read your article with great attention and my situation is
in fact case 2. in your article. I have a main thread that runs the
complete GUI and I launch (for the time being still) batch files in a
second thread.
Those batch files are generating output and it's that output I want to
"send back" to my main thread to update one or more of the fields of
the GUI. And for this, I'm currently using PyPubSub 1.0.

So, I'm registering/subscribing in the main thread to notifications
that come from the PyPubSub module. In the sub thread, I'm sending
information/notifications to the PyPubSub module, which distributes
the stuff. Well, you know it a trillion times better than me how it
works...

However, while the sub thread can have data available that, via
PyPubSub, is send to the main thread to update the GUI, also the main
thread can have data to update the GUI (the main and sub thread can
even have data for the same widgets).

And as far as Qt is concerned, you should not update the GUI from
different threads. Hence, using PyPubSub 1.0 gives me troubles since
both main and sub thread are updating the same widget(s) at the
(possible) same time. That's indeed asking for trouble...

I also know that, when you use the QtSignal() functionality, you can
emit (submit) signals from one module to another. In the book "Rapid
GUI Programming with PyQt", there's a chapter about multithreading.

I've attached 2 pages of that book which nicely explain what happens
if one thread is sending information to the other using the PyQt
signal mechanism. "Automagically", the data sent from one thread to
the other is decoupled and put in the main thread that contains the
event loop of the GUI.

The following snippet of the document is very important:

"
Behind
the scenes, when cross-thread signals are emitted, instead of calling
the relevant
method directly as is done for signals emitted and received in the
same
thread, PyQt puts an event onto the receiving thread’s event queue
with any
data that was passed. When the receiver’s event loop gets around to
reading
the event, it responds to it by calling the relevant method with any
data that
was passed.
"

This way, the sub thread cannot "directly" call methods from the main
thread, it has to "attach" its message to the main event loop.
That also avoids the problems I'm currently having, being two threads
who want to "claim" widgets to be updated at the same time.

So, very "simplymatic" said: my idea was to replace your "sendmessage"
with the PyQt's QSignal, to "automagically" decouple the sub thread
call from the main thread with respect to updating the GUI widgets.

Now back to your article:

Your case 2. requires a kind of "polling" on a queue. I didn't do
this yet in Python (as I am quite new to both Python as well as PyQt),
but I can imagine that this is not the most resource-efficient
method. Next to this, you also have to introduce a queue (which I
also haven't done yet in PyQt).

Since you're very much authored in Python, how would you exactly do
this? Your article shows pseudo code, but it would be nice if a "real
life" example could be given.

Anyhow, I think if you would be willing to introduce PyQt (or
wxPython) into your PyPubSub module (version 3), you could resolve
this in an easy way without having to have the hassle described just
above.

Again, maybe I see it way too simple, but introducing QtSignals where
you currently do the callbacks, would solve the multithreading problem
in an elegant way, since this is all done by the PyQt package (in case
of using QtSignal). Don't know, however, if such mechanism also
exists in wxPython.

Hope the above makes sense...

Best rgds,

--Geert
Reply all
Reply to author
Forward
0 new messages