Pika consumer breaks when Multiprocessing Queue is created after it.

24 views
Skip to first unread message

Stone

unread,
Jun 6, 2024, 10:12:21 AMJun 6
to Pika
On Ubuntu 20.04, my pika consumer, which is created as a separate thread, breaks whenever I create a multiprocessing Queue after it. The behavior varies, but I get this error fairly often.

[ERROR]: Unexpected frame: <METHOD(['channel_number=1', 'frame_type=1', "method=<Basic.CancelOk(['...'])>"])>

I'm creating this Queue in the main thread that is also creating the PikaThread. I'm not using multiprocessing within the Thread. This issue does not occur when the multiprocessing Queue is created first. I've created a small sample that recreates this issue and can provide it if needed. Here's a small chunk that shows when I do and don't see the error. It seems to be related to multiprocessing.Queue. This may be an error on my end, but it seems like a strange issue. My assumption is that Pika isn't properly preventing other processes from using the socket it's using.

# Start the Pika thread before multiprocessing
def start_broken(self):
self.mh.start_thread()

self.q = multiprocessing.Queue(60)
process = multiprocessing.Process(target=self.foo)
process.start()

# Start the Pika thread after multiprocessing
def start_working(self):
self.q = multiprocessing.Queue(60)
process = multiprocessing.Process(target=self.foo)
process.start() self.mh.start_thread()

Luke Bakken

unread,
Jun 6, 2024, 10:28:29 PMJun 6
to Pika
Hello,

Thanks for using Pika and RabbitMQ. Your code doesn't show much, but what I can say is this:

Pika is not thread safe, and neither is it multiprocess safe.

Each of your Process instances should run their own Pika connection and channels.

Thanks,
Luke

Stone

unread,
Jun 7, 2024, 8:15:14 AMJun 7
to Pika
Hi Luke. I may not have explained myself clearly, but to clarify, I'm not using a pika connection in the Process target function. Foo just loops infinitely and does print(foo). There is only one connection and it is in a thread that the mh class starts.

I've tried to attach a small sample where I was able to reproduce the issue, but Google doesn't like that. It seems to be rejecting most of my messages actually.  To add to the idea that this may be a race condition(or at least something strange), the code actually worked a couple times when all I changed was the log level for Pika from INFO to DEBUG.


def foo(self,): # target function of Process
while True:
print("foo")
time.sleep(10)
class PikaThreadHandler(threading.Thread): #Thread that gets created by mh
def __init__(self, queue_a) -> None:
super(PikaThreadHandler, self).__init__()
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.queue_a = queue_a

Luke Bakken

unread,
Jun 7, 2024, 9:22:31 AMJun 7
to Pika
Good morning!

The easiest way for you to help me assist you is to provide a console application with code I can clone and run. Commit it to a git repository on GitHub or GitLab, or some other service, and let me know the link.

Thank you -
Luke

Stone

unread,
Jun 7, 2024, 9:55:51 AMJun 7
to Pika
Morning Luke! Here's the code I used to recreate it. Also, I tried to recreate it on Mac and it failed because it couldn't pickle the lock, so it seems like it's unique to Ubuntu.
 https://github.com/StonielCFA/pika_crash_recreate

Luke Bakken

unread,
Jun 8, 2024, 2:45:00 PMJun 8
to Pika
OK, thanks. I'll take a look.

Luke Bakken

unread,
Jun 9, 2024, 8:57:20 AMJun 9
to Pika
If anyone else is following along, I assisted this user with their code because they provided a full, working example:

Reply all
Reply to author
Forward
0 new messages