Dynamically Create Consumers as per number of messages in Queues increases.

28 views
Skip to first unread message

Firoz Mohamed

unread,
Mar 6, 2018, 12:54:32 AM3/6/18
to stompest
Hi,

I want to dynamically increase the number of consumers as the number of messages increases in the Queues, what approach should be considering in achieving the same, any help would be appreciated.

I am using activemq, with stompest.

Thanks.


Sample Code from stompest used :

class Consumer(object):
QUEUE = '/queue/testIn'
ERROR_QUEUE = '/queue/testConsumerError'

def __init__(self, config=None):
if config is None:
config = StompConfig('tcp://localhost:61613')
self.config = config

@defer.inlineCallbacks
def run(self):
client = Stomp(self.config)
yield client.connect()
headers = {
# client-individual mode is necessary for concurrent processing
# (requires ActiveMQ >= 5.2)
StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
# the maximal number of messages the broker will let you work on at the same time
'activemq.prefetchSize': '100',
}
client.subscribe(self.QUEUE, headers, listener=SubscriptionListener(self.consume, errorDestination=self.ERROR_QUEUE))


def consume(self, client, frame):
"""
NOTE: you can return a Deferred here
"""
data = json.loads(frame.body.decode())
print('Received frame with count %d' % data['count'])

if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Consumer().run()
reactor.run()
Reply all
Reply to author
Forward
0 new messages