--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/ea8e44ac-f01d-4958-a24e-6f493305930e%40googlegroups.com.
def publish(self, data, close_connection=True, head_data=False): """ Send tasks to workers :param data: list :param close_connection: boolean :return: None """ for message in data: self.channel.basic_publish( exchange='', routing_key=self.queue, body=message, properties=pika.BasicProperties( delivery_mode=2 )) if head_data: logger.info(" [x] Sent %r" % message[0:env.HEAD_DATA_BITS]) else: logger.info(" [x] Sent %r" % message) if close_connection: self.close()class SimulationWorker(Worker): """ Class for distributing tasks among simulation workers """
def __init__(self): super().__init__(env.RABBITMQ_SIMULATOR_QUEUE_NAME)
def consume(self): """ Send tasks to workers :param data: list :return: None """ logger.info(' [*] Waiting for messages. To exit press CTRL+C') self.channel.basic_qos(prefetch_count=1) # send task to available worker self.channel.basic_consume(queue=self.queue, on_message_callback=self.simulate) self.channel.start_consuming()
def simulate(self, ch, method, properties, body): """ Callback function called for each task :param ch: :param method: :param properties: :param body: :return: None """ logger.info(" [*] Running simulation %r" % body) s = Simulator() data = json.loads(body) cycle = data["cycle"] phase = data["phase"] s.simulate(cycle, phase) # very expensive function call logger.info(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag)Connection is never closed by the consumer, tasks are all different. Basically each task message is really simple, a dictionary of (cycle, phase) which are integers and I can see through the logs that they are all different. What happens sometimes is that I might kill some or all workers to fix the code and rerun them. Messages are persistent in RabbitMQ.Maybe if I can monitor the tasks in the queue, that might help.