debug drain excessive queue

39 views
Skip to first unread message

Ryan Fagan

unread,
Nov 5, 2020, 11:01:47 AM11/5/20
to thespian.py
First off, thanks for Thespian. It works really well for my needs.

I have an application in a single machine that processes some data every 10 seconds. It has a number of actors that are spawned from a timer actor on a 10 second interval. Each actor does spawn some child actors to perform various calculations. We made some changes in out latest build and we started to sporadically get the following  messages in our thespian.log

What is the best way to debug why this is happening? It tends to happen multiple times per day for relatively short periods of times, but during these times, the processing slows down tremendously. I have timers in various parts of the code that we are running within the actors and those timers are indicating the code is still running very quickly. The loss of time appears to be within thespian which seems to correlate with some sort of overload due the messages in the log.

Any direction or insight you have would be greatly appreciated.

Ryan

Messages:

2020-11-05 08:38:44.570793 p120979 Warn Exited tx-only mode after draining excessive queue (950)
2020-11-05 08:38:44.696270 p120979 Warn Entering tx-only mode to drain excessive queue (950 > 950, drain-to 780 in Expires_in_0:04:59.999897)
2020-11-05 08:38:44.816118 p120979 I    Exiting tx-only mode because no transport work available.
2020-11-05 08:38:44.816181 p120979 Warn Exited tx-only mode after draining excessive queue (950)
2020-11-05 08:38:44.978768 p120979 Warn Entering tx-only mode to drain excessive queue (950 > 950, drain-to 780 in Expires_in_0:04:59.999847)
2020-11-05 08:38:45.100305 p120979 I    Exiting tx-only mode because no transport work available.
2020-11-05 08:38:45.100383 p120979 Warn Exited tx-only mode after draining excessive queue (950)
2020-11-05 08:38:45.238311 p120979 Warn Entering tx-only mode to drain excessive queue (950 > 950, drain-to 780 in Expires_in_0:04:59.999877)

Kevin Quick

unread,
Nov 5, 2020, 1:24:08 PM11/5/20
to Ryan Fagan, thespian.py
Thank you for the appreciation, it's nice to know Thespian has been useful.

The messages you are seeing indicate that the outbound message threshold is being reached.  This threshold is designed to provide some back-propagation flow control within the system, although it's a bit of an optimistic approach.

To give more background for this, each Actor object is the main thread of execution for a process (when using one of the multiproc___ bases, which you are).  The core algorithm for a Thespian Actor looks roughly something like this:

    Actor starts
    Open local inbound socket, set non-blocking mode
    while select(all_open_sockets):
        if socket_data_available and len(outbound_queue) < threshold:
            read_data
            if read_is_complete_message:
                self.receiveMessage()  # <-- your code gets called here
        if socket_outbound_can_do_work:
            non-blocking send data
            if all_data_sent:
                close_outbound_socket

    self.send():
        place send message on outgoing queue
        open outbound socket to target, set non-blocking mode

The general concept here is that the receiveMessage() call to your code will not spend overly-long periods of time before returning to that core while loop where the sockets can get serviced.  The backpressure flow control concept here (via `len(outbound_queue) < threshold)`) is that if your actor has a large number of outbound messages queued, it's a result of incoming requests to do work, so the incoming work is paused and the actor is put in tx only mode to flush the output; once the queue amount drops below a lower threshold, receives will be resumed (the  above pseudo-algorithm is a simplification: there is actually an upper and lower threshold marking points where the actor becomes tx-only and full tx-rx, respectively, to provide some hysteresis for transitions between these modes).

The messages you are seeing are from when the threshold test in line 4 fails.  There can be a couple of reasons why this would occur, including
   1 - your receiveMessage() is calling send() enough times to reach the threshold
   2 - item 1 is caused by either *receiving* a large number of messages, or a multiplier effect where an incoming message results in a large number of outgoing messages
   3 - your receiveMessage() is busy-waiting/blocking and not returning to the surrounding scheduling loop to allow outbound non-blocking processing to run
   4 - your actor is receiving a large number of requests, and the amount of time to process those requests in receiveMessage() is dominating the actor's runtime, which doesn't provide much "idle" time to process outbound requests in a non-blocking manner.

In the log messages you provided, the "p120979" means that that is the Actor running under process number 120979.  You can examine that process to see which Actor this is (e.g. `ps -leafyw | grep 120979`), which will help you determine which Actor is being stressed.

My recommendations would be to:

  A.  review the implementation of that Actor to check for issues 1 or 2.  If that Actor is generating very large numbers of sends, you may want to re-evaluate your design; if you decide this is appropriate for your design needs, then the  threshold can be raised. Unfortunately, the threshold is hard-coded right now, so you would have to make a custom Thespian installation to effect this change (`MAX_QUEUED_TRANSMITS` on line 38 of thespian/system/transport/asyncTransportBase.py); let me know if this is what you end up doing so I know if this needs to be a user-configurable threshold in the future.

  B. review the implementation of that Actor to check for issue 3.  You mentioned "every 10 seconds": I don't know how you've implemented those timers, but I would advise using self.wakeupAfter()-->WakeupMessage instead of time.sleep() in your receiveMessage.

  C.  If reviewing the Actor implementation doesn't reveal any concerns, then you might have issue 4.  You can alleviate issue 4 by either using a troupe (https://thespianpy.com/doc/using.html#hH-19cf92bf-a6ae-492c-844c-b0831c108a3a) or by trying to sub-divide the Actor's work into smaller components, with sub-actors performing those components.

Let me know if this helps!

-Kevin



--
You received this message because you are subscribed to the Google Groups "thespian.py" group.
To unsubscribe from this group and stop receiving emails from it, send an email to thespianpy+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/thespianpy/8f6d50eb-df89-40b7-aa49-d83aeebfabd0n%40googlegroups.com.


--
-KQ

Ryan Fagan

unread,
Nov 6, 2020, 2:12:17 PM11/6/20
to thespian.py
Thanks so much for your quick and detailed response. It gives me a number of avenues to pursue. By my counts, I should not be approaching the 950 limit but I am adding more debugging to ensure that is the case. I will report back here with my results.

Ryan

Ryan Fagan

unread,
Nov 10, 2020, 8:44:20 PM11/10/20
to thespian.py
I do have a timer Actor that uses wakeupAfter to wake up every 10 seconds. It gets reinitiated to the next round 10 seconds (:10, :20, etc) after processing for the instance is complete. The work in the wake up message is usually done in .8 seconds. The Actors all complete in about 3-4 seconds. I did find the thespian shell and have been able to check the status on various Actors within the system, confirm my counts, etc. If I time the status command with the 10 second wakeup, I am able to see some of the details of the instance run. The Pending Messages count was about 500. That command lists lists my logging messages (About 16 are mine, I have the threshold set to 30/WARN) but the rest are sqlalchemy LogRecord messages that are level 10 and 20 (DEBUG/INFO). Do logging messages count as messages in the throttling algorithm? I also don't understand why the sqlalchemy LogRecords (DEBUG/INFO) would be there if I am using log level WARN. I don't see them in my logs at all.

ActorAddr-(T|:38533) [#1] --> ActorAddr-(T|:38525) [#18]:  <LogRecord: sqlalchemy.engine.base.Engine, 20, /home/xxx/miniconda3/envs/xxx/lib/python3.7/site-packages/sqlalchemy/engine/base.py, 1211, "()">
ActorAddr-(T|:38533) [#1] --> ActorAddr-(T|:38525) [#18]:  <LogRecord: sqlalchemy.engine.base.Engine, 20, /home/xxx/miniconda3/envs/xxx/lib/python3.7/site-packages/sqlalchemy/engine/base.py, 759, "COMMIT">
ActorAddr-(T|:38533) [#1] --> ActorAddr-(T|:38525) [#18]:  <LogRecord: sqlalchemy.pool.impl.QueuePool, 10, /home/xxx/miniconda3/envs/xxx/lib/python3.7/site-packages/sqlalchemy/pool/base.py, 672, "Connection <ibm_db_dbi.Connection object at 0x7ffef07c9f50> being returned to pool">
ActorAddr-(T|:38533) [#1] --> ActorAddr-(T|:38525) [#18]:  <LogRecord: sqlalchemy.pool.impl.QueuePool, 10, /home/xxx/miniconda3/envs/xxx/lib/python3.7/site-packages/sqlalchemy/pool/base.py, 862, "Connection <ibm_db_dbi.Connection object at 0x7ffef07c9f50> rollback-on-return">
ActorAddr-(T|:38533) [#1] --> ActorAddr-(T|:38525) [#18]:  <LogRecord: root, 30, /home/xxx/xxx/actors/timer_actor.py, 101, "starting processing, id: 00142 for: 2020-11-11 01:28:50">
ActorAddr-(T|:38533) [#1] --> ActorAddr-(T|:45303) [#17]:  <actor_messages.xxxx.XxxMsg object at 0x7ffeef70c910>
ActorAddr-(T|:38533) [#1] --> ActorAddr-(T|:38525) [#18]:  <LogRecord: root, 30, /home/xxx/xxx/actors/timer_actor.py, 126, "scheduled next timer instance - next_timestamp: 2020-11-11 01:29:00, wake_up_time: 9.092339">

By my count I should be sending 193 messages per wakeup via self.send(<actor>, <message>). wakeup -> Timer (1 message), Timer -> Actor 1 (16 instances, one message each), Actor 1 -> Actor 2 (one instance each, 16 messages), Actor 2 -> Actors 3-8 (one instance of 5 Actors, 16*5 or 80 messages), Actors 3-8 -> Actor 2 (16*5 or 80 messages). I count 193 per interval. The Actors are reused across wakeups, each Actor maintains a list of sub Actors.

Ryan

Kevin Quick

unread,
Nov 11, 2020, 6:01:32 PM11/11/20
to Ryan Fagan, thespian.py
Hi Ryan,

Excellent research, thank you for doing that and the resulting information!

Log messages do indeed accrue against the message limit threshold, and your example shows that there probably needs to be an adjustment somewhere.

First, a little background though:

   * When using a multiproc actor base, each Actor is a separate system process.  If each actor was logging as normal to a file, there would be several issues, including the following primary concerns: (1) not all actors run on the same host system, and (2) multiple processes writing to the same logfile without coordination would corrupt the logfile.

  * In order to solve concern #2, the logging infrastructure for each Actor is updated so that instead of directly performing the logging, all log messages are forwarded from the Actor to the MultiProcAdmin that is in charge of that Actor System.  The MultiProcAdmin forwards the logs to the Logger actor, which performs all the actual logging activity.

  * Python's logging infrastructure has multiple elements, including loggers, handlers, filters, and formatters.  The main interface is the "logger", which provides the API you normally use and coordinates the other elements behind the scenes.  The actual act of writing to a file, to a screen, etc. is provided by one or more "handlers", and each handler can have a separate logging level.  In Thespian, each Actor's "logger" forwards the message to the Logger actor which acts as the "handler".  The logging configuration established at startup time is primarily used by the Logger actor to handle the requests, including determining what level of messages are logged.

  * If this is a multi-host Actor environment (aka. a Convention as described in the documentation) then the idea is that important stuff should be logged in a central location (the Convention Leader) and so concern #1 is addressed by the local MultiProcAdmin forwarding all messages of level WARN and above to the Convention Leader where they are also logged.

For your situation, concern #1 doesn't necessarily apply, but I bring it up as part of the overall logging design guidelines.  The main consideration for you is concern #2, and clearly I hadn't considered the effect of copious low-level message logging when this was setup. 

My initial thoughts to resolve this would be to look to enhance the internal Actor startup process to establish a filter level for each Actor's forwarding; this wouldn't work well with a dynamic level adjustment, but that's not currently possible anyhow.  I can probably investigate modifying Thespian to add this per-Actor log-level filter to the Actor startup internals, but I don't know that I will have time to accomplish this before early next week (i.e. spending some time on it this weekend, which I already was planning on to address an unrelated logging issue).  Please let me know if that works with your timeframe, if you have any concerns or questions about this approach, or any other suggestions for resolving this.  My workaround recommendation in the meantime is to manually increase the message threshold in a local copy of Thespian.

Regards,
  Kevin

P.S. I'd like to invite you to create an issue at https://github.com/kquick/Thespian now that we've identified the problem; if you create the issue then you'll get the notifications for updates, as well as credit for identifying and researching the problem.




--
-KQ

Ryan Fagan

unread,
Nov 12, 2020, 8:07:49 AM11/12/20
to thespian.py
Thanks again for the quick and detailed response. That timeframe certainly works for me. I also plan to reduce the usage of sql alchemy to help reduce the number of messages.

Ryan

Kevin Quick

unread,
Nov 25, 2020, 1:52:00 PM11/25/20
to Ryan Fagan, thespian.py
The logging system is complex, so between that and some other unrelated interruptions, this took a little longer than I had projected.  I do have this working to my satisfaction now and I'll be cleaning this up and making a release with this shortly.  The change is effectively to set an Actor's initial logging level to the same level that is in the logDefs configuration passed to the ActorSystem initialization code (actually the lowest level specified in the logDefs, and be aware that if unspecified, the level defaults to 0, which is "everything").

However, I also discovered that if your Actor explicitly sets the logging level, this will also control what is passed to the logging actor, which can help with your situation.  For example, calling `logging().getLogger().setLevel(logging.INFO)` in your actor that uses SQLAlchemy will set the logging baseline to INFO for that actor and suppress the sending of the DEBUG logging messages from the SQLAlchemy library.

There's some additional details and clarifications that I'll be writing up in the documentation changes accompanying the pending release, and I'll followup with a reference to those documentation changes, but I wanted to provide you with an update and the above information to avoid blocking your progress as much as possible.

Regards,
  Kevin

Kevin Quick

unread,
Nov 30, 2020, 2:01:48 AM11/30/20
to Ryan Fagan, thespian.py
Thespian 3.10.2 is now released and contains the fix above.  For more documentation on the logging, please see https://thespianpy.com/doc/using.html#hH-ce55494c-dd7a-4258-a1e8-b090c3bbb1e6.

Regards,
  Kevin
--
-KQ
Reply all
Reply to author
Forward
0 new messages