Heartbeat at time of publishing.

161 views
Skip to first unread message

knoxvox

unread,
Apr 26, 2022, 2:54:51 PM4/26/22
to Pika
Hello,

I am new to Rabbit-MQ and pika and also I apologize if this has been asked before. I will first describe my application and then will mention about the problem I am facing. Its an API built using Fast-API and whenever a user hits the endpoint, it simply publishes the message to an exchange.

The way I do is like this I have publisher class defined like below
Screenshot from 2022-04-27 00-20-06.png

so I use this publisher class to create object at the start of my application and then pass this class to each endpoint where the object will call publish function. The code looks like this (I have only shared the sample code here)

Screenshot from 2022-04-27 00-13-33.png

Whenever I do this after sometime I will get this error in my calls

[Errno 10054] An existing connection was forcibly closed by the remote host.

So I checked the server logs (in appdata/roaming/rabbit-mq/logs) and it shows that the connection is closed timeout 60s which I assume is because of heartbeat.

One way to resolve is to connect for every request and publish as shown below
Screenshot from 2022-04-27 00-21-07.png

which I think is not a optimal solution. What should I do in this case ?
  1. Should I increase the heartbeat or disable it completely (which is not recommended I guess) ?
  2. Should I connect everytime and publish and close ?
Note: I have another application which consumes the message published this API and there I dont see any problem of heartbeat.
- pika version = 1.1.0
- python version = 3.6.8


Thanks.

knoxvox

unread,
Apr 26, 2022, 2:57:26 PM4/26/22
to Pika
I forgot to mention the pika expection that got raised. It is StreamLostError

Luke Bakken

unread,
Apr 27, 2022, 2:53:53 PM4/27/22
to Pika
Hello,

You should run your Publisher class on its own thread. There is some other code in your application that is blocking Pika's I/O loop.

Since you're using Fast API, I suggest checking out this library - https://github.com/gmr/aiorabbit

It is built with async / await in mind, which Fast API supports - https://fastapi.tiangolo.com/async/

Thanks,
Luke

knoxvox

unread,
May 10, 2022, 2:18:40 PM5/10/22
to Pika
Hello,

Thanks for suggestions. I am thinking to write publisher on its own thread but having some doubts over my implementation

This is my idea : 
That is whenever there is a request to the endpoint, I will put the message to Queue (thread safe queue) and then publisher will take the message from queue and then publishes the message. The example code looks like this

Screenshot from 2022-05-10 23-44-15.png

Here I have created publisher connection in a different thread but my doubt is that `queue.get` is blocking call. So will this also not block the Pika I/O loop ?
Is there any other way to communicate between threads which is non-blocking or else I have to use this library : https://github.com/gmr/aiorabbit

Thanks.

Luke Bakken

unread,
May 19, 2022, 11:57:13 AM5/19/22
to Pika
Hello,

In the future please attach your code samples as files or use a git repository. Screenshots prevent me from copying your code to modify it.

Yes, queue.get() will block Pika. Even if you use another library, queue.get() will always block the current thread. This is just how blocking calls interact with threading.

When you have an item in the queue that should be published, you can use connection.add_callback_threadsafe to schedule work to be done. In your case, this would be to publish a message.

https://github.com/pika/pika/blob/master/examples/basic_consumer_threaded.py#L36-L37

I'm assuming in your code that Publisher uses BlockingConnection underneath. Your code would then look something like this. Note that this is NOT TESTED and probably has all kinds of syntax errors. The main points are:
  • When your application starts, it should start your Publisher and save it in global state that your REST callbacks can access. This usually depends on the framework you're using.
  • The Publisher class calls process_data_events to "run forever". It will break out every 5 seconds to see if it should still run. If stop is called then on the next iteration the Publisher and associated thread will exit.
  • By using add_callback_threadsafe the publish will happen on the correct thread that is running Pika's I/O loop. NOTE that if there is an error with publishing your REST application will not know it. You will have to deal with error conditions in your Publisher.
Let me know if you have any questions.
Luke

import functools
import threading

class Publisher:
    def __init__(self, config):
        self.is_running = True
        self.connection = BlockingConnection(...)
        self.channel = self.connection.channel()
    def run():
        while is_running:
             self.connection.process_data_events(time_limit=5)
    def stop():
        is_running = False
    def do_publish(payload):
        encoded_payload = encode(payload)
        self.channel.basic_publish(exchange=,routing_key=,body=encoded_payload)
    def publish(payload):
        cb = functools.partial(do_publish, payload)
        self.connection.add_callback_threadsafe(cb)

def start_publisher(app_state, config):
    publisher = Publisher(config)
    app_state['pika_publisher']  = publisher
    publisher.run()

def rest_api_application_startup():
    publisher_thread = threading.Thread(target=start_publisher, args=(app_state, queue})
    publisher_thread.start()

@app.get("/hello")
def say_hello(payload, app_state):
    publisher = app_state['pika_publisher']
    publisher.publish(payload)
    return {"successful"}

knoxvox

unread,
May 23, 2022, 1:07:17 PM5/23/22
to Pika
Hello,

Thanks for the explanation and code snippets.

Reply all
Reply to author
Forward
0 new messages