Optimal Number of Queues Per Consumer when subclassing the kombu.mixins.ConsumerMixin class

67 views
Skip to first unread message

Thuita Wachira

unread,
Sep 19, 2019, 1:23:01 PM9/19/19
to celery-users
Hi guys,
I'm trying to look for documentation/articles regarding the considerations to be taken when passing many queues to any subclass kombu.mixins.ConsumerMixin
Does performance degrade in any way when you pass too many queues to any subclasses of the ConsumerMixin? How does the consumer ensure fairness in how it consumes from the queues?
See example below: I am passing 30 queues to an instance of the ConsumerMixin. Is this ok? How does this affect throughput?
I'm basically interested in the performance implications of this, if any, or if there are any fairness algorithms at play, and if so, how to tweak them

from base.consumer import Consumer
 

 exchange = Exchange(config.rabbit_mq.exchange, type="direct")
 redis_conn = Redis(
     host=config.redis.host, port=int(config.redis.port), db=int(config.redis.db)
 )
 queue_names = [
     "{queue_prefix}".format(queue_prefix=config.rabbit_mq.queues_prefix)
     + str(x)
     for x in range(1, 31)
 ]

 queues = [
     Queue(queue_name, exchange, routing_key=queue_name)
     for queue_name in queue_names
 ]

 worker = BaseWorker(
     exchange=exchange,
     consumer=Consumer,
     queues=queues,
     logger=logger,
     config=config,
     elastic_search_conn=None,
     redis_conn=redis_conn,
 )

 worker.run_consumers()



class BaseWorker(object):
    def __init__(
        self,
        exchange=None,
        consumer=None,
        queues=None,
        logger=None,
        config=None,
        elastic_search_conn=None,
        redis_conn=None,
    ):
        self.queues = queues
        self.logger = logger
        self.config = config
        self.consumer = consumer
        self.exchange = exchange
        self.elastic_search_conn = elastic_search_conn
        self.redis_conn = redis_conn

    def run_consumers(self):
        with Connection(self.config.rabbit_mq.url) as conn:
            worker = self.consumer(
                conn,
                self.exchange,
                self.queues,
                self.logger,
                self.config,
                self.elastic_search_conn,
                self.redis_conn,
            )
            worker.run()


Valentin Interesniy

unread,
Dec 19, 2019, 7:09:45 PM12/19/19
to celery-users


четверг, 19 сентября 2019 г., 20:23:01 UTC+3 пользователь Thuita Wachira написал:
Reply all
Reply to author
Forward
0 new messages