Below is a minimal example which reproduces this problem:
Server:
import json
from snakemq.link import Link
from snakemq.packeter import Packeter
from snakemq.messaging import Messaging
from snakemq.message import Message
def on_recv(conn, ident, message):
print(message.data)
payload = json.loads(message.data.decode())
messaging.send_message(ident, Message(b'Hello :)' * 100))
link = Link()
packeter = Packeter(link)
messaging = Messaging('worker_test', '', packeter)
link.add_listener(('', 4000))
messaging.on_message_recv.add(on_recv)
link.loop()
==========================================================================
Client:
from snakemq.link import Link
from snakemq.packeter import Packeter
from snakemq.messaging import Messaging
from snakemq.message import Message
import threading
import json
import time
class TestThread(threading.Thread):
def __init__(self, ident):
"""Initialises thread and message queue.
"""
super(TestThread, self).__init__()
self.snakemq_ident = ident
self.received_flag = threading.Event()
self.stop_flag = threading.Event()
self.link = Link()
self.packeter = Packeter(self.link)
self.messaging = Messaging(ident, '', self.packeter)
self.messaging.on_message_recv.add(self.on_recv)
self.link.on_loop_pass.add(self.on_loop_pass)
self.link.add_connector(('localhost', 4000))
def on_recv(self, conn_id, ident, payload):
"""Called when thread receives message."""
print(payload.data)
self.received_flag.set()
def on_loop_pass(self):
"""Called after link loop poll is processed. Checks to see if
we signalled for message thread to exit.
"""
if self.stop_flag.is_set():
self.link.stop()
self.link.cleanup()
def start(self):
"""Block for a brief period to let thread start up."""
super(TestThread, self).start()
time.sleep(0.2)
def stop(self):
"""Terminates the thread."""
self.stop_flag.set()
def run(self):
"""Main thread subroutine - run link loop."""
self.link.loop()
def send_message(self, ident, message):
"""Packages up message and sends via broker to destination
ident.
"""
self.messaging.send_message(
'worker_test',
Message(json.dumps(message).encode('utf-8')))
iteration = 0
while True:
iteration += 1
print(iteration)
t1 = TestThread('t1')
t1.start()
t1.send_message('worker_test', 'Hello from T1!!!' * 100)
t1.received_flag.wait()
t1.stop()
t2 = TestThread('t2')
t2.start()
t2.send_message('worker_test', 'Hello from T2!!!' * 100)
t2.received_flag.wait()
t2.stop()
==========================================================================
I've found (possibly a coincidence) that crashes sometimes happen when there's some user interaction occurring - two recent examples include opening new tabs in the browser, and opening an Explorer window. The only common link between these is that they all utilise sockets.
Sometimes only a few iterations are required (< 100), sometimes it requires many more before a crash occurs.
There's nothing in the SnakeMQ logs, nothing on the console and nothing in Event viewer. I have absolutely no idea what's causing the lockup.
System is Windows 7 64-bit, Python 3.4 64-bit.
I've tried this on three separate Windows machines (different hardware) and the result is the same on each. No problems running under Ubuntu 14.04 64-bit.
Any clues?
Unfortunately for me this bug is a bit of a show-stopper :(