Why wouldn't Pika be publishing?

126 views
Skip to first unread message

Dominick Pastore

unread,
Jul 20, 2020, 11:15:40 AM7/20/20
to rabbitmq-users
I have a C program that uses rabbitmq-c and I'm trying to write a Python script to test it with Pika. The goal is to send a few messages, receive responses from the C program, and compare them against the expected responses. The problem is, Pika seems to be doing everything except actually publishing the messages.

It's not the prettiest code I've ever written, but here is the class I'm using to do this (currently littered with print()s for debugging):

class RabbitMQSession:
   
"""Manages sending and receiving RabbitMQ messages for
    test_monitor_rabbitmq()"""

   
def __init__(self, rabbitmq_config, binding_keys, messages):
       
"""Execute a RabbitMQ session

        rabbitmq_config - A dict with the following keys:
          - 'server'
          - 'port'
          - 'username'
          - 'password'
          - 'exchange'
          - 'vhost'
        binding_keys - A list of binding keys to bind to
        messages - A list of messages (routing_key, body) to send
        """

       
self.config = rabbitmq_config
       
self.binding_keys = binding_keys
       
self.bound_count = 0
       
self.to_send = messages
       
self.received = []

       
# Initiate connection
        credentials
= pika.PlainCredentials(rabbitmq_config['username'],
                                            rabbitmq_config
['password'])
        parameters
= pika.ConnectionParameters(
            host
=rabbitmq_config['server'], port=rabbitmq_config['port'],
            virtual_host
=rabbitmq_config['vhost'], credentials=credentials)
       
print("@@@ initializing connection")
       
self.connection = pika.SelectConnection(
            parameters
, on_open_callback=self._on_open)

       
# Do producing and consuming
       
try:
           
print("@@@ starting")
           
self.connection.ioloop.start()
           
print("@@@ normal exit")
       
except KeyboardInterrupt:
           
# This probably won't normally happen, but if tests are run manually
           
# Close gracefully
           
print("@@@ closing after interrupt")
           
self.connection.close()
           
# Let Pika handle any remaining communications
           
print("@@@ waiting for cleanup messages")
           
self.connection.ioloop.start()

       
print("@@@ rabbitmq done")

   
def _on_open(self, connection):
       
print("@@@ conn open")
        connection
.add_on_close_callback(self._on_connection_close)
        connection
.channel(on_open_callback=self._on_channel)

   
def _on_connection_close(self, connection, exception):
       
print("@@@ connection is closing")
       
self.connection.ioloop.stop()

   
def _on_channel(self, channel):
       
print("@@@ channel open")
       
self.channel = channel
        channel
.add_on_close_callback(self._on_channel_close)
       
# Declare queue
        channel
.queue_declare(
           
'', exclusive=True, callback=self._on_queue)

   
def _on_channel_close(self, channel, exception):
       
print("@@@ channel is closing:", repr(exception))
       
#self.connection.close()

   
def _on_queue(self, declare_ok):
       
self.queue = declare_ok.method.queue
       
# Bind to the binding keys for outgoing messages
       
print("@@@ queue declared")
       
print("@@@ config", self.config)
       
for binding_key in self.binding_keys:
           
print("@@@ rk", binding_key)
           
self.channel.queue_bind(
                exchange
=self.config['exchange'], queue=self.queue,
                routing_key
=binding_key, callback=self._on_bind)

   
def _on_bind(self, bind_ok):
       
self.bound_count += 1
       
print("@@@ queue {} bound".format(self.bound_count))

       
if self.bound_count == len(self.binding_keys):
           
# Start consuming
           
self.channel.basic_consume(
                queue
=self.queue, auto_ack=True,
                on_message_callback
=self._consume,
                callback
=self._produce)

   
def _consume(self, channel, method, properties, body):
       
print("@@@ consumed message")
        ev_json
= json.loads(body)
       
self.received.append((method.routing_key, ev_json))

   
def _produce(self, consume_ok):
       
print("@@@ started consuming")

       
# Publish messages
        properties
= pika.BasicProperties(content_type='application/json',
                                          type
='smedl-fmt2.0')
       
for rk, body in self.to_send:
           
print("@@@ publishing message:", rk)
           
print(body)
           
print("@@@ end of message")
           
self.channel.basic_publish(
                exchange
=self.config['exchange'], routing_key=rk,
                body
=body, properties=properties)
            time
.sleep(0.5)

       
# Wait a moment for responses
       
print("@@@ waiting for last responses")
        time
.sleep(2)
       
self.connection.close()
       
print("@@@ closed")

And here is a sample of the output (where it is given one binding key to subscribe to and one message to send):

@@@ initializing connection
@@@ starting
@@@ conn open
@@@ channel open
@@@ queue declared
@@@ config {'server': 'localhost', 'port': 5672, 'username': 'guest', 'password': 'guest', 'exchange': 'smedl.Adder', 'vhost': '/'}
@@@ rk _Adder_sum.#
@@@ queue 1 bound
@@@ started consuming
@@@ publishing message: conn.measurement
{"event": "measurement", "params": [2.0], "aux": {"arbitrary": "json data"}}
@@@ end of message
@@@ waiting for last responses
@@@ closed
@@@ channel is closing: ChannelClosedByClient: (200) 'Normal shutdown'
@@@ connection is closing
@@@ normal exit
@@@ rabbitmq done

It definitely executes the basic_publish() call, but there are no indications that a message is actually sent, either in the C program (which I'm still testing, but generally has been well-behaved), amqp-consume, or in the web management console for the RabbitMQ server.

Am I doing something wrong here?

(Also, as an unrelated question, when the channel close callback is called, is there any way to tell whether the connection closed with it or not, i.e. whether it's safe to call connection.close()?)

Thanks,
Dominick

Gavin M. Roy

unread,
Jul 20, 2020, 11:38:52 AM7/20/20
to rabbitm...@googlegroups.com
I imagine your use of time.sleep() is the issue, blocking the IOLoop from actually working.


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/6a2cc285-bb3b-4648-827e-bdde167b55d1o%40googlegroups.com.

Dominick Pastore

unread,
Jul 20, 2020, 11:52:46 AM7/20/20
to rabbitmq-users
Hmm. Ok. Then I'm not really sure what to do instead.

There's two sleeps in the code. The 0.5s one is perhaps not strictly necessary, but seemed nice to have to help ensure a more readable message ordering (that way, if monitoring the session externally, there won't be multiple requests bunched up before the first responses come). But I can probably remove it.

The other sleep, the 2s one, seems more important. Without it, I'm not sure how to keep the connection from closing right away before receiving the responses (bearing in mind, I can't make any assumptions about how many responses there will be or even that there will be any at all). Is there a better way to do that?
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitm...@googlegroups.com.

Dominick Pastore

unread,
Jul 20, 2020, 1:51:43 PM7/20/20
to rabbitmq-users
Ah, I just found the ioloop.call_later() function in the asynchronous publisher example. I used the asynchronous consumer example for inspiration when I wrote this, and that call doesn't seem to appear anywhere else in the documentation. Pika's API documentation has quite some gaps in it.

Luke Bakken

unread,
Jul 20, 2020, 6:37:00 PM7/20/20
to rabbitmq-users
Hi Dominick,

We would gladly review a pull request to address documentation issues -

https://github.com/pika/pika/

Pika is maintained by RabbitMQ core engineering team members when they have time to do so, which isn't very often lately.

Thanks,
Luke

Dominick Pastore

unread,
Jul 22, 2020, 3:24:46 PM7/22/20
to rabbitmq-users
I'd be willing to help, but my understanding of the code is pretty shallow. I don't really think I know it well enough to make worthwhile contributions.
Reply all
Reply to author
Forward
0 new messages