Is there a way not to run process_data_events in an infinite while loop on producer side(
producer.py) in RPC , i don't have a complete knowledge on rabbitmq, is there a another method or a way it could be done. An example code is below.
producer.py
import json
import pika
class RabbitMqService:
queue_name = None
callback_queue = None
channel = None
connection = None
corr_id = None
response = None
def __init__(self, queue_name):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.target_queue = "target_queue"
self.channel.queue_declare(queue=self.target_queue, durable=True, arguments={"x-queue-type": "quorum"})
self.queue_name = queue_name
result = self.channel.queue_declare(queue=self.queue_name, auto_delete=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = self.queue_name
temp = json.dumps(n)
temp_bytes = bytes(temp, 'UTF-8')
self.channel.basic_publish(exchange='', routing_key=self.target_queue,
properties=pika.BasicProperties(reply_to=self.queue_name,
correlation_id=self.queue_name,
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE),
body=temp_bytes)
while self.response is None:
self.connection.process_data_events(time_limit=None)
return self.response.decode("utf-8")
def value(self):
n = [[27, 28, 29, 30], [31, 32, 33]]
for i in n:
print(i,"--",RabbitMqService.call(self, i))
ob = RabbitMqService("test_queue")
ob.value()
consumer.py
import json
import pika
class RabbitMqService:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
self.input_queue = "target_queue"
self.channel.queue_declare(queue=self.input_queue, durable=True, arguments={"x-queue-type": "quorum"})
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(queue=self.input_queue, on_message_callback=self.on_request)
def fib(self,n):
if n < 0:
print("Incorrect input")
elif n == 0:
return 0
elif n == 1 or n == 2:
return 1
else:
return self.fib(n - 1) + self.fib(n - 2)
def on_request(self, ch, method, props, body):
temp = []
n = json.loads(body.decode('utf-8'))
for i in n:
response = self.fib(i)
temp.append(response)
ch.basic_publish(exchange='', routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(temp))
ch.basic_ack(delivery_tag=method.delivery_tag)
def _consumption(self):
self.channel.start_consuming()
ob = RabbitMqService()
ob._consumption()