from base.consumer import Consumer
exchange = Exchange(config.rabbit_mq.exchange, type="direct") redis_conn = Redis( host=config.redis.host, port=int(config.redis.port), db=int(config.redis.db) ) queue_names = [ "{queue_prefix}".format(queue_prefix=config.rabbit_mq.queues_prefix) + str(x) for x in range(1, 31) ]
queues = [ Queue(queue_name, exchange, routing_key=queue_name) for queue_name in queue_names ]
worker = BaseWorker( exchange=exchange, consumer=Consumer, queues=queues, logger=logger, config=config, elastic_search_conn=None, redis_conn=redis_conn, )
worker.run_consumers()
class BaseWorker(object): def __init__( self, exchange=None, consumer=None, queues=None, logger=None, config=None, elastic_search_conn=None, redis_conn=None, ): self.queues = queues self.logger = logger self.config = config self.consumer = consumer self.exchange = exchange self.elastic_search_conn = elastic_search_conn self.redis_conn = redis_conn
def run_consumers(self): with Connection(self.config.rabbit_mq.url) as conn: worker = self.consumer( conn, self.exchange, self.queues, self.logger, self.config, self.elastic_search_conn, self.redis_conn, ) worker.run()