How to use process_data_events without an infinite while loop

140 views
Skip to first unread message

james

unread,
Jan 7, 2023, 11:18:39 PM1/7/23
to rabbitmq-users
Is there a way not to run process_data_events in an infinite while loop on producer side(producer.py) in RPC , i don't have a complete knowledge on rabbitmq,  is there a another method or a way it could be done. An example code is below.
producer.py

import json
import pika


class RabbitMqService:
queue_name = None
callback_queue = None
channel = None
connection = None
corr_id = None
response = None

def __init__(self, queue_name):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.target_queue = "target_queue"
self.channel.queue_declare(queue=self.target_queue, durable=True, arguments={"x-queue-type": "quorum"})
self.queue_name = queue_name
result = self.channel.queue_declare(queue=self.queue_name, auto_delete=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)
self.response = None
self.corr_id = None

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = self.queue_name
temp = json.dumps(n)
temp_bytes = bytes(temp, 'UTF-8')
self.channel.basic_publish(exchange='', routing_key=self.target_queue,
properties=pika.BasicProperties(reply_to=self.queue_name,
correlation_id=self.queue_name,
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE),
body=temp_bytes)

while self.response is None:
self.connection.process_data_events(time_limit=None)
return self.response.decode("utf-8")

def value(self):
n = [[27, 28, 29, 30], [31, 32, 33]]
for i in n:
print(i,"--",RabbitMqService.call(self, i))


ob = RabbitMqService("test_queue")
ob.value()

consumer.py
import json
import pika


class RabbitMqService:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.input_queue = "target_queue"
self.channel.queue_declare(queue=self.input_queue, durable=True, arguments={"x-queue-type": "quorum"})
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(queue=self.input_queue, on_message_callback=self.on_request)

def fib(self,n):
if n < 0:
print("Incorrect input")
elif n == 0:
return 0
elif n == 1 or n == 2:
return 1
else:
return self.fib(n - 1) + self.fib(n - 2)

def on_request(self, ch, method, props, body):
temp = []
n = json.loads(body.decode('utf-8'))
for i in n:
response = self.fib(i)
temp.append(response)

ch.basic_publish(exchange='', routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(temp))
ch.basic_ack(delivery_tag=method.delivery_tag)

def _consumption(self):
self.channel.start_consuming()


ob = RabbitMqService()
ob._consumption()
Reply all
Reply to author
Forward
0 new messages