Hello,
In the future please attach your code samples as files or use a git repository. Screenshots prevent me from copying your code to modify it.
Yes,
queue.get() will block Pika. Even if you use another library,
queue.get() will
always block the current thread. This is just how blocking calls interact with threading.
When you have an item in the queue that should be published, you can use connection.add_callback_threadsafe to schedule work to be done. In your case, this would be to publish a message.
- When your application starts, it should start your Publisher and save it in global state that your REST callbacks can access. This usually depends on the framework you're using.
- The Publisher class calls process_data_events to "run forever". It will break out every 5 seconds to see if it should still run. If stop is called then on the next iteration the Publisher and associated thread will exit.
- By using add_callback_threadsafe the publish will happen on the correct thread that is running Pika's I/O loop. NOTE that if there is an error with publishing your REST application will not know it. You will have to deal with error conditions in your Publisher.
Let me know if you have any questions.
Luke
import functools
import threading
class Publisher:
def __init__(self, config):
self.is_running = True
self.connection = BlockingConnection(...)
self.channel = self.connection.channel()
def run():
while is_running:
self.connection.process_data_events(time_limit=5)
def stop():
is_running = False
def do_publish(payload):
encoded_payload = encode(payload)
self.channel.basic_publish(exchange=,routing_key=,body=encoded_payload)
def publish(payload):
cb = functools.partial(do_publish, payload)
self.connection.add_callback_threadsafe(cb)
def start_publisher(app_state, config):
publisher = Publisher(config)
app_state['pika_publisher'] = publisher
publisher.run()
def rest_api_application_startup():
publisher_thread = threading.Thread(target=start_publisher, args=(app_state, queue})
publisher_thread.start()
@app.get("/hello")
def say_hello(payload, app_state):
publisher = app_state['pika_publisher']
publisher.publish(payload)
return {"successful"}