Best practices and whether should I have long lived publishers or not

41 views
Skip to first unread message

Aleksandre Bregadze

unread,
Apr 23, 2024, 1:34:23 PMApr 23
to Pika
Hi guys,

I have been working with Pika for some time, used it in multiple projects, but still unsure if I am doing things right.

I am almost always using python, thus Pika for publishing and never for consuming, therefore in short I need to send messages consistently and hopefully with as little errors as possible on the way. For this, is it better to use short lived connections or long lived?

Before hands, I was using the short lived approach:
Connect -> send -> close -> repeat for the each message
For some weird reasons I was getting a lot of warnings
"2024-01-23 14:23:52.088532+00:00 [warning] <0.1048873.0> client unexpectedly closed TCP connection"
Also I was getting missed heartbeats, but I have no clue why, connections were short lived.

Now I decided to make long lived connection and I think I broke stuff in production environment -_-...
As a result I decided to rewrite the whole RMQ utilities, I followed the example of yours at https://github.com/pika/pika/blob/main/examples/long_running_publisher.py
and have built the following.

I would really appreciate if you could identify the problems in my code and help me to design the well working version.

1. I initiate a single instance of RMQ per application or worker through django settings:
```
from project.utils.rmq_utils import RMQPublisher

# One connection per app for now
RMQPublisherInstance = RMQPublisher()
RMQPublisherInstance.start()
```



2. Here is the publisher
https://bitbucket.org/shecemenko/workspace/snippets/7qqBEj
I decided to create snippet for it so that you could view all conveniently

Thank you sooo much!

Luke Bakken

unread,
May 2, 2024, 11:41:14 AMMay 2
to Pika
Yes, in every case, you should use long-lived connections.

Your publisher code seems fine. I can't explain why you're having issues with Django. The only way I can assist you is if you provide a simple Django project, using your Pika code, that exhibits the issue.

Share the project via a git repository that I can clone.

Aleksandre Bregadze

unread,
May 2, 2024, 12:15:37 PMMay 2
to Pika
Thank you for your message, I have been testing this code past 9 days in prod and so far I have noticed 0 problems. Which is wonderful I guess. But what I did notice is that the long lived threaded publisher did not work with Celery Beat.
Let me elaborate:
1 - I used this class in one of the celery tasks, then called apply() and celery worker worked well. No problems at all, messages were published.
2 - Then instead of apply(), I used Celery Beats scheduler, I think its called PeriodicTask. Here I noticed that self.connection.add_callback_threadsafe was not being executed.

I believe problem is coming from threading, tried to find a work around, but nothing made sense. Thus I added a short lived publisher as well, just for celery beat. Are there any specific configurations when we want to use long lived published with Celery Beat?

Nishant Varma

unread,
Jun 11, 2024, 6:15:36 AMJun 11
to Pika
Won't connection/channel get closed after initializtion (__init__)? What does handle_rmq_connection_error do to recover? I have something similar: https://bpa.st/W6ZA. The main difference is: I am trying to ensure_channel before publish.

Luke Bakken

unread,
Jun 11, 2024, 9:10:31 AMJun 11
to Pika
Hi everyone,

It's not very productive to talk about code that you can't actually see and run.

Aleks - as I said before, if you would like assistance with your Celery Beat code, provide a simple project that I can review.

Based on the name, "PeriodicTask", it seems like Celery may not keep your long-lived connection around, and that you should open a new connection every time that task executes. That's not a big deal unless your task runs multiple times per second.

Thanks,
Luke

On Tuesday, June 11, 2024 at 3:15:36 AM UTC-7 nishan...@gmail.com wrote:
Won't connection/channel get closed after initializtion (__init__)? What does handle_rmq_connection_error do to recover? I have something similar: https://bpa.st/W6ZA. The main difference is: I am trying to ensure_channel before publish.

On Thursday 2 May 2024 at 21:45:37 UTC+5:30 al...@itsy.dev wrote:
Thank you for your message, I have been testing this code past 9 days in prod and so far I have noticed 0 problems. Which is wonderful I guess. But what I did notice is that the long lived threaded publisher did not work with Celery Beat.
Let me elaborate:
1 - I used this class in one of the celery tasks, then called apply() and celery worker worked well. No problems at all, messages were published.
2 - Then instead of apply(), I used Celery Beats scheduler, I think its called PeriodicTask. Here I noticed that self.connection.add_callback_threadsafe was not being executed.

I believe problem is coming from threading, tried to find a work around, but nothing made sense. Thus I added a short lived publisher as well, just for celery beat. Are there any specific configurations when we want to use long lived published with Celery Beat?

Reply all
Reply to author
Forward
0 new messages