ChannelClosed after not publishing messages

1,526 views
Skip to first unread message

Yifei Kong

unread,
Aug 8, 2018, 11:19:51 PM8/8/18
to Pika
Hi,

My script queries db and publishes messages to Rabbit MQ in a loop. When I publishes one message per 5s, it works very well, but after a while(say 5 minutes) not publishing messages, and then restart publishing messages, I got ChannelClosed Error.

Is this a expected behavior? I was wondering if there is a heartbeat that I should keep, but I did not found the docs.

Thanks for your attention!

Yifei Kong

unread,
Aug 8, 2018, 11:26:26 PM8/8/18
to Pika
I also noticed:

```
=ERROR REPORT==== 9-Aug-2018::10:43:05 ===
closing AMQP connection <0.19277.0> (10.1.1.1:47358 -> 10.1.1.2:5672):
Missed heartbeats from client, timeout: 60s
```
so maybe this is the same issue as in https://github.com/pika/pika/issues/1104 ?

lba...@pivotal.io

unread,
Aug 9, 2018, 10:01:40 AM8/9/18
to Pika
Hi Yifei,

Yes, you must keep the heartbeat active. If your script blocks Pika while it's not publishing messages, then you will see timeout errors.

You should be using Pika version 0.12.0.

There is quite a bit of example code here that you can use: https://github.com/pika/pika/tree/0.12.0/examples

If you provide your code I could make suggestions.

Thanks,
Luke

Yifei Kong

unread,
Aug 10, 2018, 2:59:41 AM8/10/18
to Pika
My code is here:

```
while True:
    urls = pull_urls()  # this won't take longer than 5s
    if not urls:
        time.sleep(0.5)
        continue
    for url in start_urls:
        msg = json.dumps({'url': url, 'run_id': run_id})
        channel.publish('', NEW_URLS_QUEUE, json.dumps(msg))
```

lba...@pivotal.io

unread,
Aug 10, 2018, 9:37:30 AM8/10/18
to Pika
Hi Yifei,

If that while loop runs in the same thread as Pika, it will block Pika's ioloop. You can't have two loops running in the same thread at the same time - this is a general programming issue, not specific to either Pika or Python.

Since you didn't provide all of your code, I assume that prior to this loop, you start a Pika connection.

I can assist further if you provide all of your code. Otherwise I am just guessing, but I'm pretty sure what's going on.

Thanks,
Luke

Yifei Kong

unread,
Aug 10, 2018, 7:57:16 PM8/10/18
to pika-...@googlegroups.com
Thanks, I think I understand the issue now, I'm using BlockingConnection and maybe block for a very long time. so pika does not get a chance to process heartbeats. Under your inspiration(blocking pika), google took me to this issue https://github.com/pika/pika/issues/196, and I tried to fixed my code accordingly.

My full code is too long, so I took the crucial part of problem Code

```
conn = pika.BlockingConnection()
channel = conn.channel()

while True:
    message = poll_message()  # this is a problem, if it takes too long, pika does not get to send heartbeats
    if not message:
        logging.info('no message to process, sleep for a while')
        time.sleep(1)  # this is a problem, when no message for a while, pika does not get a change to send heartbeats
        continue
    channel.basic_publish(message...)
```

my solution is one of the following:

1. make sure poll_message does not take too long and use conn.sleep
2. just use a heartbeat thread with conn.process_data_events. (but can I call conn.process_data_events in another thread?)
3. increase or just turn off the heartbeat

So which one is the best solution? or are these solutions correct?

Cheers

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Yifei Kong
May the force be with you.


Vitaly Krug

unread,
Aug 27, 2018, 12:41:11 AM8/27/18
to pika-...@googlegroups.com
Hi Yifei,

A pika connection instance is not thread-safe, so don't call it from another thread.

You can periodically call Conn.process-data-events() or conn.sleep() from your main loop to keep the heartbeat going. 

Pika 0.12 also supports a more advanced technique that enables you to transfer messages from another thread to pika's thread via thread-safe callbacks. This allows you to decouple those I/O loops from each other.

Of course, your app should be prepared for disruptions anyway, and be ready to recover from lost connections. Networks and servers are not immune to service disruptions.

Good luck,
Vitaly



To unsubscribe from this group and stop receiving emails from it, send an email to pika-python...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Yifei Kong
May the force be with you.


--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages