Pika with FastAPI

1,892 views
Skip to first unread message

Паша Дубовик

unread,
Mar 7, 2023, 1:44:53 PM3/7/23
to rabbitmq-users
Hello to the pika community.
I have a fairly simple question. But I can't find a solution for it. I want to use pika to post messages to a queue. Messages will come from clients in the post method of the fastapi application.
Example
@app.post("/")
def send_messages(messages: str):
      # publish to the AMPQ

Initially, I used BlockingConection to create the connection and channel. But from time to time I got errors on which I realized that the problem is in multithreading, which is apparently used in fastapi under the hood and pika connections are not thread safe. Therefore, I have a question, is it possible, without creating a separate connection for each request, to use pika to publish messages to the queue in the context I have written? I'm going to create the connection once and reuse it in the set_messages function. It seems to me that this task is almost trivial, but I did not find suitable examples on the Internet.

Luke Bakken

unread,
Mar 7, 2023, 2:07:00 PM3/7/23
to rabbitmq-users
Hello,

Seems like there are quite a few examples online - https://www.google.com/search?q=%22pika%22+%22fastapi

If you'd like to make your source code available via a git repository (hosted on GitHub, for instance), I could assist.

FastAPI appears to be asyncio-based, so this library could be an option as well - https://github.com/gmr/aiorabbit

Again, the most effective way to help us help you is to share a complete, runnable code sample.

Thanks,
Luke

Паша Дубовик

unread,
Mar 7, 2023, 2:22:21 PM3/7/23
to rabbitmq-users
The problem is that all fastapi examples are based on aio pika. And I'm trying to understand whether it is possible to use pika and BlockingConnection.
Okay, I'll ask differently. How can I create a connection in one thread and publish from other threads? I don’t understand from the documentation how you can use the add_calback_threadsafe method in this case
For example in the connection.py file:
def get_rabbit_connection():
     connection = pika.BlockingConnection(
         pika.ConnectionParameters(
             heartbeat=0,
             host=settings.RABBITMQ_HOST,
             port=settings.RABBITMQ_PORT,
             credentials=PlainCredentials(settings.RABBITMQ_DEFAULT_USER,
                                          settings.RABBITMQ_DEFAULT_PASS)
         )
     )
     return connection


rabbitmq_client = get_rabbit_connection()
rabbitmq_channel = rabbitmq_client.channel()
rabbitmq_channel.queue_declare(queue="my_queue")


Call in the main.py module
import threading
from connection import rabbit_channel

def main(ch, msgs):
         ch.basic_publish(
         exchange="",
         routing_key="my_queue",
         body=msgs
         properties=BasicProperties(
             delivery_mode=2
         )
     )
if __name__=="__main__":
      thread = threading.Thread(target=main, args=(ch, msgs))
      thread.start()

вторник, 7 марта 2023 г. в 22:07:00 UTC+3, Luke Bakken:

Luke Bakken

unread,
Mar 7, 2023, 4:35:11 PM3/7/23
to rabbitmq-users
llo,

Sorry to belabor the point, but the google search to which I linked returns this Stack Overflow post as the first hit:

https://stackoverflow.com/a/70919959/1466825

It shows a complete example of using Pika, the AsyncioConnection class, and Fast API. How is that not sufficient?

Yes, you can use BlockingConnection but you should use  AsyncioConnection if you're using Fast API.

As for your other question, I took your code and added it to the following repository:


Attempting to run it produces this error:

$ python main.py
  File "/home/lbakken/development/lukebakken/rabbitmq-users-pika-fSrIb6df-yc/main.py", line 8
    body=msgs
         ^^^^
SyntaxError: invalid syntax. Perhaps you forgot a comma?


Yes, I can fix the errors in the code you provided but it would have been nice to not have to take this extra step. I will also show how to publish from the main thread.

Thanks,
Luke

Luke Bakken

unread,
Mar 7, 2023, 5:18:12 PM3/7/23
to rabbitmq-users

Паша Дубовик

unread,
Mar 8, 2023, 5:34:31 AM3/8/23
to rabbitm...@googlegroups.com
Thanks a lot. This solves my problem!
I also want to note that pika's documentation, in my opinion, is not complete enough. Maybe I can't read)

I still have a question, how can I find the queue size from another thread using add_callback_threadsafe?

ср, 8 мар. 2023 г. в 00:35, Luke Bakken <luker...@gmail.com>:
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/ae69dd68-e58f-4cd4-a795-04da7d43cb5en%40googlegroups.com.

Luke Bakken

unread,
Mar 8, 2023, 9:52:24 AM3/8/23
to rabbitmq-users
I also want to note that pika's documentation, in my opinion, is not complete enough. Maybe I can't read)

If you can point to specific documents that can be updated, or make suggestions, or even better, submit pull requests to improve the documentation, it would be appreciated.

In this case, it seems like I should add the code I wrote for you to the examples/ directory. I'll do that.

I still have a question, how can I find the queue size from another thread using add_callback_threadsafe?

What have you read and tried to figure this out for yourself?

Thanks -
Luke

Паша Дубовик

unread,
Mar 8, 2023, 1:36:14 PM3/8/23
to rabbitm...@googlegroups.com

This is from official pika documentation:

  • Is Pika thread safe?

    Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method add_callback_threadsafe from another thread to schedule a callback within an active pika connection.

but it says nothing about how to use this method.  I look at your example and it seems obvious to me, but from the documentation I would not understand how to use this method.  It is also not clear when to use one or the other adapter.  You are trying hard to tell me that with FastAPI I should use AsyncConnector, but my FastAPI paths are not asynchronous, so I am not required to use asynchronous connection.

when I try to find advice on using this or that connector with pika, I only end up on stackoverflow instead of the documentation.  Or your great examples on github.  but it is not at all clear to me in what cases I should do as in the examples.

As for my question about the queue size, it seems to me such a trivial question that it is not clear why pika does not have a default method for getting the queue size.  i know i can do it by declaring a passive queue and calling channel.queue.method.message_count

As for my question about the queue size, it seems to me such a trivial question that it is not clear why pika does not have a default method for getting the queue size.  i know i can do it by declaring a passive queue and calling channel.queue.method.message_count

Ср, 8 марта 2023 г. в 17:52, Luke Bakken <luker...@gmail.com>:
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

Luke Bakken

unread,
Mar 8, 2023, 4:16:38 PM3/8/23
to rabbitmq-users
Thank you for the feedback! It is very helpful. I'll see if I can find time to improve the documentation.

As for queue size, yes, a passive declare is the correct way to do that. This is how it is done in every AMQP 091 client.

As for FastCGI... since FastCGI is already using Python's asyncio library (and I/O loop) it makes the most sense to use that connection type in Pika. This applies even if you aren't using any "async" functions. I'll put together an example.

Thanks,
Luke

Паша Дубовик

unread,
Mar 9, 2023, 4:19:42 AM3/9/23
to rabbitm...@googlegroups.com
"As for queue size, yes, a passive declare is the correct way to do that. This is how it is done in every AMQP 091 client."
Yes, but how I can do that with thread save?


чт, 9 мар. 2023 г. в 00:16, Luke Bakken <luker...@gmail.com>:
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

Luke Bakken

unread,
Mar 9, 2023, 7:29:06 PM3/9/23
to rabbitmq-users
https://docs.python.org/3/library/queue.html

See my latest update:


All of this could be encapsulated into a thread-safe class - https://github.com/pika/pika/issues/1426

Note that the programming model I'm demonstrating isn't unique to Pika or even Python. ANY program or library that has an I/O loop will execute that loop on a dedicated thread, and provide ways to schedule work to be executed on the same thread. Pika has the add_callback_threadsafe method.
Thanks,
Luke

Luke Bakken

unread,
Mar 9, 2023, 9:40:37 PM3/9/23
to rabbitmq-users
And I just want to re-iterate, that if you're using FastAPI, this is the correct way to use it with Pika:

Паша Дубовик

unread,
Mar 11, 2023, 1:43:03 AM3/11/23
to rabbitm...@googlegroups.com
Thank you very much for your help!
And I'm glad if my question contributes to the appearance of a thread-safe client in pika!

пт, 10 мар. 2023 г. в 05:40, Luke Bakken <luker...@gmail.com>:
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages