class Consumer(object):
#
def __init__(self, func, queue, next_queues):
self._func = func
self._queue = queue
self._next_queues = next_queues
#
credentials = pika.PlainCredentials('guest', 'guest')
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.94.19.192',
credentials=credentials,
heartbeat=600))
#
self._channel = self._connection.channel()
#
self._channel.queue_declare(queue=self._queue)
#
self._channel.basic_qos(prefetch_count=1)
self._channel.basic_consume(self.callback, queue=self._queue)
logger.info("Waiting for task for {queue}".format(queue=self._queue))
self._channel.start_consuming()
#
def callback(self, ch, method, props, body):
logger.info(u"Worker " + unicode(self._queue) + u" receive mes" + unicode(body))
params = self._func(body)
if self._next_queues and params:
for next_queue in self._next_queues:
for param in params:
task_mes['from'] = self._queue
task_mes['param'] = param
self._channel.basic_publish(exchange='',
routing_key=next_queue,
properties=pika.BasicProperties(delivery_mode=2),
body=dumps(task_mes))
u" publish mes" + unicode(dumps(task_mes)))
#
ch.basic_ack(delivery_tag=method.delivery_tag)
def run(func, queue, next_queues=None):
Consumer(func=func, queue=queue, next_queues=next_queues)
I gei error pika.exceptions.ConnectionClosed: (-1, "error(104, 'Connection reset by peer')")