Re: [wxPython-users] application hangs when large no of message is publish to a Topic in pubsub

41 views
Skip to first unread message

oliver

unread,
Dec 3, 2015, 11:16:28 PM12/3/15
to wxpytho...@googlegroups.com, PyPubSub
On Thu, Dec 3, 2015 at 10:29 AM, Hemadri Saxena <hemadri...@gmail.com> wrote:

> I also want to know how the message is deliverer to listener when multiple publisher published messages ? what happens when they publish message at the same time..

Pubsub itself is single threaded. The listeners are called in same thread as sender. In a single threaded app, only one message can be sent at a time, and next message can be published only when all listeners have processed the previous message. When sender and receiver "live" in different threads, you should queue sent messages via a sync queue specific to each listener, and have the receiving threads check for messages on these queues.

ahh...for me some of my senders and receiver "live" in different threads and few senders are in same thread as receiver.  So for all those senders which are in different thread. I will create a queue and push the messages to queue then do I need pass whole queue itself ?

Well, possibly. You only need the queue on the receiver side. However, one nice aspect of pubsub is that each listener is unaware of any other listeners, and directly gets called when a sender emits a message. It would be nice to keep this. But I think in this case you need one queue per listener, in which case it is simpler to create a listener wrapper that contains a queue. Something like this: 

from threading import queue

class QueueingListener:
    def __init__(self, listener):
         self.__listener = listener
         self.__queue = queue
    def __call__(self, *args, **kwargs):
         self.__queue.push((args, kwargs))
    def proc_queued(self):
         # call the listener for each message queued
         while self.__queue:
               args, kwargs = self.__queue.pop()
               self.__listener(*args, **kwargs)

class SomeObj:
    def listener(self):
        print('hi')

class SomeThread(thread):

    ...

    # start running:
    ...
    obj = SomeObj()
    self.queueing_listener = QueueingListener(obj.listener)
    pub.subscribe(self.queueing_listener)

    # enter some long-running loop (download file, etc)
    while ...: 
        ...do a bit of stuff
        self.queueing_listener.proc_queued()

This will ensure that if pubsub messages are received while the thread is doing a chunk of work, the listener can be called safely when the thread decides it is ok (ie between chunks of work). Note pubsub doesn't store strong references to listeners (so that listeners are autmoatically disposed of when there are no other references left to them), this means you have to store a strong reference to the listener wrapper. 

HTH.         
 
For all those senders which in same thread as that of receiver it should be as it is.

Correct.
Oliver 
Reply all
Reply to author
Forward
0 new messages