class RabbitMQSession:
"""Manages sending and receiving RabbitMQ messages for
test_monitor_rabbitmq()"""
def __init__(self, rabbitmq_config, binding_keys, messages):
"""Execute a RabbitMQ session
rabbitmq_config - A dict with the following keys:
- 'server'
- 'port'
- 'username'
- 'password'
- 'exchange'
- 'vhost'
binding_keys - A list of binding keys to bind to
messages - A list of messages (routing_key, body) to send
"""
self.config = rabbitmq_config
self.binding_keys = binding_keys
self.bound_count = 0
self.to_send = messages
self.received = []
# Initiate connection
credentials = pika.PlainCredentials(rabbitmq_config['username'],
rabbitmq_config['password'])
parameters = pika.ConnectionParameters(
host=rabbitmq_config['server'], port=rabbitmq_config['port'],
virtual_host=rabbitmq_config['vhost'], credentials=credentials)
print("@@@ initializing connection")
self.connection = pika.SelectConnection(
parameters, on_open_callback=self._on_open)
# Do producing and consuming
try:
print("@@@ starting")
self.connection.ioloop.start()
print("@@@ normal exit")
except KeyboardInterrupt:
# This probably won't normally happen, but if tests are run manually
# Close gracefully
print("@@@ closing after interrupt")
self.connection.close()
# Let Pika handle any remaining communications
print("@@@ waiting for cleanup messages")
self.connection.ioloop.start()
print("@@@ rabbitmq done")
def _on_open(self, connection):
print("@@@ conn open")
connection.add_on_close_callback(self._on_connection_close)
connection.channel(on_open_callback=self._on_channel)
def _on_connection_close(self, connection, exception):
print("@@@ connection is closing")
self.connection.ioloop.stop()
def _on_channel(self, channel):
print("@@@ channel open")
self.channel = channel
channel.add_on_close_callback(self._on_channel_close)
# Declare queue
channel.queue_declare(
'', exclusive=True, callback=self._on_queue)
def _on_channel_close(self, channel, exception):
print("@@@ channel is closing:", repr(exception))
#self.connection.close()
def _on_queue(self, declare_ok):
self.queue = declare_ok.method.queue
# Bind to the binding keys for outgoing messages
print("@@@ queue declared")
print("@@@ config", self.config)
for binding_key in self.binding_keys:
print("@@@ rk", binding_key)
self.channel.queue_bind(
exchange=self.config['exchange'], queue=self.queue,
routing_key=binding_key, callback=self._on_bind)
def _on_bind(self, bind_ok):
self.bound_count += 1
print("@@@ queue {} bound".format(self.bound_count))
if self.bound_count == len(self.binding_keys):
# Start consuming
self.channel.basic_consume(
queue=self.queue, auto_ack=True,
on_message_callback=self._consume,
callback=self._produce)
def _consume(self, channel, method, properties, body):
print("@@@ consumed message")
ev_json = json.loads(body)
self.received.append((method.routing_key, ev_json))
def _produce(self, consume_ok):
print("@@@ started consuming")
# Publish messages
properties = pika.BasicProperties(content_type='application/json',
type='smedl-fmt2.0')
for rk, body in self.to_send:
print("@@@ publishing message:", rk)
print(body)
print("@@@ end of message")
self.channel.basic_publish(
exchange=self.config['exchange'], routing_key=rk,
body=body, properties=properties)
time.sleep(0.5)
# Wait a moment for responses
print("@@@ waiting for last responses")
time.sleep(2)
self.connection.close()
print("@@@ closed")