Reducing SelectPoller polling timeout

298 views
Skip to first unread message

Eric Tse

unread,
May 11, 2018, 11:30:06 AM5/11/18
to pika-...@googlegroups.com
Hello,

We have an application that is writing messages out to RabbitMQ by invoking Channel.basic_publish on a channel created off of a SelectConnection.

We're noticing delays (~2-4 seconds) between calling basic_publish and when the message is actually written to the socket. After some digging, we found that in using a SelectConnection, Pika chooses a SelectPoller for polling. From SelectPoller.poll():

                if (self._fd_events[READ] or self._fd_events[WRITE] or
                      self._fd_events[ERROR]):
                    read, write, error = select.select(
                        self._fd_events[READ],
                        self._fd_events[WRITE],
                        self._fd_events[ERROR],
                        self._get_next_deadline())


Based on what we see for the implementation of _get_next_deadline()this seems to return _MAX_POLL_TIMEOUT (ie. 5 seconds) when not using timeouts (which sounds like what I'm seeing, as we invoke basic_publish directly rather than through setting a timeout+callback).

Things we've tried with some success:

Wrapping our call to basic_publish inside of a timeout callback with a low timeout:

connection.add_timeout(0.25, lambda: self.channel.basic_publish(...))

Although it primes the return value of self._get_next_deadline(), the first attempt at calling basic_publish still seems to be delayed (I suspect this is because we may be already waiting on select() with a timeout of _MAX_POLL_TIMEOUT). Subsequent publishes sped up as expected. This approach seems more in-line with the example code, which also uses add_timeout to schedule a regular interval for publishing messages (our use case is more for publishing messages in response to irregular events, like button presses or even consumption of another message). 

We've also tried tweaking _MAX_POLL_TIMEOUT:

SelectPoller._MAX_POLL_TIMEOUT = 0.25

As far as results go, this sped up all of our publishes, including the first. However, this doesn't appear to be the "pythonic" way of doing things based on my understanding (the variable is "protected"?). Also, we're not sure of whether there are any consequences in lowering the default poll time (all we've seen is CPU usage jumping when set to 0, but nothing notable > 0)


Is the expected usage of SelectConnection with add_timeout instead of directly calling basic_publish? What is the recommended way for handling this delay in SelectPoller - should we be using add_timeout, overriding _MAX_POLL_TIMEOUT, defining a custom IOLoop/Poller, or perhaps something else entirely?

Other info:
OS: Windows 10
Python: 3.5.4
Pika: 0.11.2

Thanks


--
Eric

lba...@pivotal.io

unread,
May 11, 2018, 1:38:29 PM5/11/18
to Pika
Hi Eric,

I suspect you have found the root cause of the issue reported yesterday: https://github.com/pika/pika/issues/1044

I had planned on researching that issue today and your analysis is extremely helpful. We'll see about getting a fix for this into 0.12

If you have a suggested fix, a pull request would be appreciated :-)

Thanks!
Luke

Vitaly Krug

unread,
May 11, 2018, 3:30:55 PM5/11/18
to Pika
Hi Luke, et al.,

the default timeout in select poller is just a timeout in case no I/O event of interest occurs. The poller is configured  (or should be configured) to always wait on poll events from an "interrupt" file descriptor  which is used by add_callback_threadsafe() to wake up the poller (unblock its poll/select call).

So the value of the default timeout shouldn't matter at all - the implementation doesn't (shouldn't) rely on it for I/O.

Vitaly Krug

unread,
May 11, 2018, 3:35:16 PM5/11/18
to Pika
Eric, which version of pika are you using in this scenario?

Eric Tse

unread,
May 11, 2018, 3:57:38 PM5/11/18
to Pika
I'm using Pika 0.11.2 

lba...@pivotal.io

unread,
May 11, 2018, 5:18:53 PM5/11/18
to Pika
Hi Vitaly,

I will follow up on the issue as well, but I believe this line is the culprit:


This will always sleep when the passed in set of descriptors are all empty arrays. It doesn't seem like we want this.

Luke

lba...@pivotal.io

unread,
May 11, 2018, 6:00:24 PM5/11/18
to Pika
Hi Eric,

If you're able to diagnose further, could you see if you are entering the else: clause here:


If the passed in sets of file descriptors are all empty arrays, you'll hit the time.sleep call there.

Thanks -
Luke

On Friday, May 11, 2018 at 8:30:06 AM UTC-7, Eric Tse wrote:

Eric Tse

unread,
May 14, 2018, 2:54:48 PM5/14/18
to Pika
Hi Luke & Vitaly,

Sorry I was out for the weekend. Luke, to answer your question about the time.sleep call, we aren't entering that else case.

I've looked over your discussion on https://github.com/pika/pika/issues/1044, and I think the problem might be similar, but perhaps not entirely the same. For one, on Pika 0.11.2 I'm not seeing the references to thread_id (or anything relating to thread id) in https://github.com/pika/pika/blob/0.11.2/pika/adapters/select_connection.py.

I've cooked up a few test scenarios based on the asynchronous_consumer/publisher_example.py at https://github.com/pika/pika/tree/0.11.2/examples (see attached *.py).

The only change I've made to the pika 0.11.2 source is as follows (to test Luke's hypothesis from earlier):

                if (self._fd_events[READ] or self._fd_events[WRITE] or
                      self._fd_events[ERROR]):
                    LOGGER.debug("Waiting on Select. Next Deadline - {0}".format(self._get_next_deadline()))
                    read, write, error = select.select(
                        self._fd_events[READ],
                        self._fd_events[WRITE],
                        self._fd_events[ERROR],
                        self._get_next_deadline())
                else:
                    # NOTE When called without any FDs, select fails on
                    # Windows with error 10022, 'An invalid argument was
                    # supplied'.
                    LOGGER.debug("Sleeping. Next Deadline - {0}".format(self._get_next_deadline()))
                    time.sleep(self._get_next_deadline())
                    read, write, error = [], [], []


Scenarios I've tested:

  • Base case - no changes to the attached files (logs in pika-test.txt)
    • With the "scheduling" implementation of ExamplePublisher, we can see that within a fraction of a second of publishing the message, the message was already received by the consumer. This seems like the expected behavior to me:
      • INFO       2018-05-14 13:24:25,403 asynchronous_publisher_example publish_message                      288 : Published message # 1
      • INFO       2018-05-14 13:24:25,408 asynchronous_consumer_example  on_message                           267 : Received message # 1 from example-publisher: b'"\xd9\x85\xd9\x81\xd8\xaa\xd8\xa7\xd8\xad \xd9\x82\xd9\x8a\xd9\x85\xd8\xa9 \xe9\x94\xae \xe5\x80\xbc \xe3\x82\xad\xe3\x83\xbc \xe5\x80\xa4"'
  • Uncomment "Scenario 1" block from main.py, comment out calls to schedule_next_message() from ExamplePublisher. This is more in line with my original use case, where we run our producers/consumers from separate threads than the thread that is invoking basic_publish (see pika-test_scenario1.txt):
    • When invoking basic_publish from a different thread AND not using add_timeout, we seem to be incurring the delay originally reported (note the ~5s difference between message #1 publish log and receipt log - contrasting with the near instant response time using the base case example above)
      • INFO       2018-05-14 13:31:58,470 __main__                       main                                 32  : Starting message publishing - Scenario 1...
      • DEBUG      2018-05-14 13:31:58,555 pika.adapters.select_connection poll                                 692 : Waiting on Select. Next Deadline - 5
      • DEBUG      2018-05-14 13:31:58,570 pika.adapters.select_connection poll                                 692 : Waiting on Select. Next Deadline - 5
      • INFO       2018-05-14 13:31:59,471 asynchronous_publisher_example publish_message                      288 : Published message # 1
      • INFO       2018-05-14 13:32:00,471 asynchronous_publisher_example publish_message                      288 : Published message # 2
      • INFO       2018-05-14 13:32:01,472 asynchronous_publisher_example publish_message                      288 : Published message # 3
      • INFO       2018-05-14 13:32:02,472 asynchronous_publisher_example publish_message                      288 : Published message # 4
      • INFO       2018-05-14 13:32:03,473 asynchronous_publisher_example publish_message                      288 : Published message # 5
      • DEBUG      2018-05-14 13:32:03,565 pika.adapters.select_connection poll                                 692 : Waiting on Select. Next Deadline - 5
      • DEBUG      2018-05-14 13:32:03,565 pika.adapters.select_connection poll                                 692 : Waiting on Select. Next Deadline - 5
      • INFO       2018-05-14 13:32:03,565 asynchronous_consumer_example  on_message                           267 : Received message # 1 from example-publisher: b'"\xd9\x85\xd9\x81\xd8\xaa\xd8\xa7\xd8\xad \xd9\x82\xd9\x8a\xd9\x85\xd8\xa9 \xe9\x94\xae \xe5\x80\xbc \xe3\x82\xad\xe3\x83\xbc \xe5\x80\xa4"'
  • Uncomment "Scenario 2" block from main.py, comment out calls to schedule_next_message() from ExamplePublisher. This is trying to emulate a "scheduling" logic to see what happens if we use add_timeout instead of calling basic_publish directly, but still invoking on a separate thread (see pika-test_scenario2.txt)
    • The results here show that we only queue two messages (instead of the 10 messages expected - maybe because of collisions in timeout values?), BUT the messages themselves do not suffer the same delay as the Scenario 1 case:
      • INFO       2018-05-14 13:35:33,520 __main__                       main                                 40  : Starting message publishing - Scenario 2...
      • DEBUG      2018-05-14 13:35:33,520 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 623018837105524063; deadline=1 at 1526319334.520757
      • DEBUG      2018-05-14 13:35:33,520 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 623018837105524063; deadline=1 at 1526319334.520757
      • DEBUG      2018-05-14 13:35:33,520 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 623018837105524063; deadline=1 at 1526319334.520757
      • DEBUG      2018-05-14 13:35:33,521 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 877984588470500703; deadline=1 at 1526319334.5216563
      • DEBUG      2018-05-14 13:35:33,521 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 877984588470500703; deadline=1 at 1526319334.5216563
      • DEBUG      2018-05-14 13:35:33,521 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 877984588470500703; deadline=1 at 1526319334.5216563
      • DEBUG      2018-05-14 13:35:33,521 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 877984588470500703; deadline=1 at 1526319334.5216563
      • DEBUG      2018-05-14 13:35:33,521 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 877984588470500703; deadline=1 at 1526319334.5216563
      • DEBUG      2018-05-14 13:35:33,521 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 877984588470500703; deadline=1 at 1526319334.5216563
      • DEBUG      2018-05-14 13:35:33,521 pika.adapters.select_connection add_timeout                          311 : add_timeout: added timeout 877984588470500703; deadline=1 at 1526319334.5216563
      • DEBUG      2018-05-14 13:35:33,715 pika.adapters.select_connection poll                                 692 : Waiting on Select. Next Deadline - 0.805495023727417
      • DEBUG      2018-05-14 13:35:33,715 pika.adapters.select_connection poll                                 692 : Waiting on Select. Next Deadline - 5
      • INFO       2018-05-14 13:35:34,521 asynchronous_publisher_example publish_message                      288 : Published message # 1
      • DEBUG      2018-05-14 13:35:34,522 pika.adapters.select_connection poll                                 692 : Waiting on Select. Next Deadline - 0
      • INFO       2018-05-14 13:35:34,522 asynchronous_publisher_example publish_message                      288 : Published message # 2
      • DEBUG      2018-05-14 13:35:34,523 pika.adapters.select_connection poll                                 692 : Waiting on Select. Next Deadline - 5
      • INFO       2018-05-14 13:35:34,524 asynchronous_consumer_example  on_message                           267 : Received message # 1 from example-publisher: b'"\xd9\x85\xd9\x81\xd8\xaa\xd8\xa7\xd8\xad \xd9\x82\xd9\x8a\xd9\x85\xd8\xa9 \xe9\x94\xae \xe5\x80\xbc \xe3\x82\xad\xe3\x83\xbc \xe5\x80\xa4"'

Just based on this limited testing, it would appear to me that this could be an issue with invoking basic_publish off-thread? 

My reasoning here is that for both the base and scenario 2 test cases, we see near-zero delay time between the publish and receipt logs when invoking add_timeout regardless of which thread it is invoked from, which I presume is because the callback specified by add_timeout ends up being called on the same thread that runs the IOLoop. In both these cases, the callback of add_timeout is essentially just basic_publish, so I'm theorizing the issue relates more from which thread basic_publish is invoked rather than the presence or absence of add_timeout. Perhaps select.select() is not recognizing the changes to the file descriptors in _PollerBase._fd_events if they occur off-thread, so all of our publishes remain queued up until the next call to select.select()?

Unfortunately, my understanding of Python is fairly limited, so please let me know if I'm just completely off-base.

Thanks
asynchronous_consumer_example.py
asynchronous_publisher_example.py
main.py
pika-test.txt
pika-test_scenario1.txt
pika-test_scenario2.txt

Vitaly Krug

unread,
May 14, 2018, 4:59:58 PM5/14/18
to Pika
I was just about to ask about threads and noticed that you offered up this info yourself. Therein lies the problem. A pika connection instance, including any of its channels, is not thread-safe. Once you create a connection, you may interact with it and its channels only from the same thread - the thread that's running the connection's ioloop (a.k.a. "event loop" in some asynchronous frameworks). Most frameworks (Tornado, Asyncio, etc.) offer a mechanism for threads to call to request a callback that will be dispatched from the context of the ioloop; select_connection's IOLoop added such a mechanism via IOLoop.add_callback_threadsafe() very recently, starting with Pika 0.12.0 release candidate. Your threads may use this mechanism to delegate calls to the connection or its channels from the context of the running ioloop.

Vitaly Krug

unread,
May 14, 2018, 5:05:02 PM5/14/18
to Pika
Eric, see discussion about threading in Pika's README: https://github.com/pika/pika. Look through all occurrences of the word "thread" in the README.
Reply all
Reply to author
Forward
0 new messages