Firoz Mohamed
unread,Mar 6, 2018, 12:54:32 AM3/6/18Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to stompest
Hi,
I want to dynamically increase the number of consumers as the number of messages increases in the Queues, what approach should be considering in achieving the same, any help would be appreciated.
I am using activemq, with stompest.
Thanks.
Sample Code from stompest used :
class Consumer(object):
QUEUE = '/queue/testIn'
ERROR_QUEUE = '/queue/testConsumerError'
def __init__(self, config=None):
if config is None:
config = StompConfig('tcp://localhost:61613')
self.config = config
@defer.inlineCallbacks
def run(self):
client = Stomp(self.config)
yield client.connect()
headers = {
# client-individual mode is necessary for concurrent processing
# (requires ActiveMQ >= 5.2)
StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL,
# the maximal number of messages the broker will let you work on at the same time
'activemq.prefetchSize': '100',
}
client.subscribe(self.QUEUE, headers, listener=SubscriptionListener(self.consume, errorDestination=self.ERROR_QUEUE))
def consume(self, client, frame):
"""
NOTE: you can return a Deferred here
"""
data = json.loads(frame.body.decode())
print('Received frame with count %d' % data['count'])
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
Consumer().run()
reactor.run()