actors on steroids--must read

37 views
Skip to first unread message

William la Forge

unread,
Apr 17, 2011, 10:37:43 AM4/17/11
to AgileWikiDevelopers
http://www.infoq.com/presentations/Actor-Thinking

A really great presentation on the power of just actors. --b

William la Forge

unread,
Apr 21, 2011, 12:48:08 AM4/21/11
to AgileWikiDevelopers
I've been thinking about how to make actors faster. Now an actor is pretty fast as long as it has mail to process--there's no thread switching. The problem is when you send a message to an actor which has no mail to process--it waits for a thread to go idle. At the same time, when an actor has no more messages to process it must get an actor with pending messages from a common queue--and this is the bottle neck. If you think about it, it is a classical data flow issue. So long as you have a few actors consuming lots of messages things are fast. But when you have lots of actors which are generally idle when they receive a message, things are slower.

So we should re-architect things. Lets make our threads a bit smarter (we'll call them dispatchers) and pre-assign actors to them. Each dispatcher has its own queue of actors which have pending messages as well as a table of idle actors. Things will now run faster so long as all the number of busy dispatchers is equal to the number of hardware threads.

We still need to handle the case when a dispatcher goes idle. It will need to get more actors by stealing an actor with pending messages from another dispatcher. Stealing work is generally a faster solution (it doesn't happen all the time) than sharing a common queue.

Bill

William la Forge

unread,
Apr 21, 2011, 1:32:57 AM4/21/11
to AgileWikiDevelopers
Lets drill down a bit deeper and look at implementation. :-)

We will have two new kinds of actors--dispatchers and light weight actors. Both promise never to block and to process all messages as they are received. Dispatchers are implemented simply as a scala reactor.

When a light weight actor is created it is assigned to a dispatcher, round robin, by sending the new actor to the dispatcher as a message. So initially all dispatchers have the same number of light weight actors to manage.

When a message is sent to a light weight actor, it is forwarded to that actor's dispatcher. The dispatcher adds the message to the light weight actor's inbox and makes sure the actor is in its pending actor queue.

When a dispatcher becomes idle, it sends a message to the other dispatchers requesting work. On receipt of such a message, a dispatcher sends the next actor in its pending queue to the dispatcher which requested work.

And when a light weight actor completes the processing of a message, a wakeup message is sent to its dispatcher and control is returned to the dispatcher. The dispatcher then processes its next message and if it is a wakeup message the dispatcher process the next message from the inbox of the light weight actor being processed. If there are no such messages, then it pulls the next actor off its pending queue and begins processing those messages--and if the pending queue is empty then it asks for work from the other dispatchers and marks itself idle.

When a dispatcher receives an actor, it assigns itself as the dispatcher of that actor. If the actor has pending messages it also adds the actor to the pending queue and if the dispatcher had been marked as idle, clears its idle flag and sends itself a wakeup message.

Dispatchers then are marked as idle when created. And they process the following types of messages:
1. Actor assignment. This can be a new light weight actor or one with pending messages.
2. A work request from another dispatcher.
3. A wakeup message.
4. A message destined for one of its light-weight actors.

Should run like crazy.

Bill

William la Forge

unread,
Apr 21, 2011, 1:55:35 AM4/21/11
to AgileWikiDevelopers
Time now to drill a bit deeper and look at the data structures...

A dispatcher is a scala reactor and has
1. A queue of assigned actors with pending messages. And
2. An idle flag.

A light weight actor inherits the LightWeightActor class and has
1. A message queue.
2. A reference to its assigned dispatcher. And
3. A message dispatch reference to a class which contains a dispatch method--initially itself.

Having a message dispatch reference allows a light weight actor to change its behavior easily. This is the BECOME action described by Dale in http://www.infoq.com/presentations/Actor-Thinking

What we end up with is something which may even be faster (better vertical scaling) than Dale Schumacher's asynchronous objects, while retaining all the power of scala.

Bill

William la Forge

unread,
Apr 21, 2011, 2:52:39 AM4/21/11
to AgileWikiDevelopers
3 fixes:

1. A dispatcher will sometimes (rarely) receive a message for a light weight actor which it just passed to a new dispatcher. The fix is to just forward the message.
2. To keep 1 (above) simple, the assignment of an actor to a dispatcher should first update the dispatcher reference in the light weight actor before sending the actor to the new dispatcher.
3. There is now no need to send newly created actors to a dispatcher--just do a round robin pick of a dispatcher and assign it to the actor.

And yes, I've added a comment about this thread here: http://www.infoq.com/presentations/Actor-Thinking

Bill

William la Forge

unread,
May 8, 2011, 3:44:19 AM5/8/11
to AgileWikiDevelopers
It is interesting how restrictions yield speed. My basic idea is to support a kind of actor which does not block, with the intent of eliminating thread switches when there are many actors which often have no pending messages.

This is achieved simply by randomly assigning idle actors which have received a message to a dispatch thread. Doing random assignments means the load is kept distributed among the threads. Note that this approach does not work for actors which might block, as then all the actors on the same dispatcher thread are also blocked. But it is much faster than creating a queue of pending actors which feed a pool of threads, as that involves thread switches.

You will find my first draft of this code below. To start with there are only 2 classes: DispatchThread and NBActorCapability. It is also hard-coded with 4 dispatchers right now--you need to have one dispatcher for each hardware thread supported by your CPU.

Note that thread switching will still occur when the load is light.

Bill

package org.agilewiki
package util
package actors
package nonblocking

import java.util.concurrent.LinkedBlockingQueue
import java.util.Random

class DispatchThread extends Thread {
  private val pendingActors = new LinkedBlockingQueue[NBActorCapability]

  override def run {
    while(true) pendingActors.take.processMessages
  }

  private[nonblocking] def addNBActor(nba: NBActorCapability) {
    pendingActors.put(nba)
  }
}

object DispatchThread {
  private val N = 4
  private val dispatchers = new Array[DispatchThread](N)
  private val random = new Random

  var i = 0
  while (i < N) {
    val dt = new DispatchThread
    dispatchers(i) = dt
    dt.start
    i = i + 1
  }

  private[nonblocking] def addNBActor(nba: NBActorCapability) {
    dispatchers(random.nextInt(N)).addNBActor(nba)
  }
}

package org.agilewiki
package util
package actors
package nonblocking

import scala.PartialFunction
import java.util.ArrayDeque

trait NBActorCapability {
  private val pendingMessages = new ArrayDeque[Any]

  def !(msg: Any) {
    synchronized {
      pendingMessages.addLast(msg)
      if (pendingMessages.size == 1) DispatchThread.addNBActor(this)
    }
  }

  private[nonblocking] def processMessages {
    while (true) {
      val msg = pendingMessages.getFirst
      messageHandler(msg)
      synchronized {
        pendingMessages.removeFirst
      }
    }
  }

  protected def messageHandler: PartialFunction[Any, Unit]
}

William la Forge

unread,
May 11, 2011, 12:12:07 AM5/11/11
to AgileWikiDevelopers
I've been concerned about the dispatch logic. Random isn't threadsafe and even with random, the load will become unbalanced over time. I've replaced Random with a thread local round robin index, and added load sharing. Below is the revised code for dispatch.


Bill

package org.agilewiki
package util
package actors
package nonblocking

import java.util.concurrent.LinkedBlockingDeque

class DispatchThread extends Thread {
  private val pendingActors = new LinkedBlockingDeque[NBActorCapability]

  override def run {
    while(true) {
      val nba = pendingActors.takeFirst
      if (pendingActors.size > 1) DispatchThread.addNBActor(pendingActors.pollLast)
      nba.processMessages

    }
  }

  private[nonblocking] def addNBActor(nba: NBActorCapability) {
    pendingActors.putLast(nba)
  }

  private[nonblocking] def isEmpty = pendingActors.isEmpty

}

object DispatchThread {
  private val N = 4
  private val dispatchers = new Array[DispatchThread](N)
  private val next= new ThreadLocal[Int] {
    override protected def initialValue = 0

  }

  var i = 0
  while (i < N) {
    val dt = new DispatchThread
    dispatchers(i) = dt
    dt.start
    i = i + 1
  }

  private[nonblocking] def addNBActor(nba: NBActorCapability) {
    val ct = Thread.currentThread
    if (ct.isInstanceOf[DispatchThread] && ct.asInstanceOf[DispatchThread].isEmpty)
      ct.asInstanceOf[DispatchThread].addNBActor(nba)
    else {
      val n = (next.get + 1) % N
      next.set(n)
      dispatchers(n).addNBActor(nba)

William la Forge

unread,
May 11, 2011, 12:34:20 AM5/11/11
to AgileWikiDevelopers
OK, thread local isn't the fastest thing to use. We should avoid it when possible. (See the updated dispatcher below--dispatcher threads now have their own round robin index and thread local is only used when sending messages from a non-dispatcher thread.)


Bill

package org.agilewiki
package util
package actors
package nonblocking

import java.util.concurrent.LinkedBlockingDeque

class DispatchThread extends Thread {
  private val pendingActors = new LinkedBlockingDeque[NBActorCapability]
  private[nonblocking] var next = 0


  override def run {
    while(true) {
      val nba = pendingActors.takeFirst
      if (pendingActors.size > 1) DispatchThread.addNBActor(pendingActors.pollLast)
      nba.processMessages
    }
  }

  private[nonblocking] def addNBActor(nba: NBActorCapability) {
    pendingActors.putLast(nba)
  }

  private[nonblocking] def isEmpty = pendingActors.isEmpty
}

object DispatchThread {
  private val N = 4
  private val dispatchers = new Array[DispatchThread](N)
  private val next= new ThreadLocal[Int] {
    override protected def initialValue = 0
  }

  var i = 0
  while (i < N) {
    val dt = new DispatchThread
    dispatchers(i) = dt
    dt.start
    i = i + 1
  }

  private[nonblocking] def addNBActor(nba: NBActorCapability) {
    val ct = Thread.currentThread
    if (ct.isInstanceOf[DispatchThread]) {
      val dt = ct.asInstanceOf[DispatchThread]
      if (dt.isEmpty)
        ct.asInstanceOf[DispatchThread].addNBActor(nba)
      else {
        dt.next = (dt.next + 1) % N
        dispatchers(dt.next).addNBActor(nba)

William la Forge

unread,
May 12, 2011, 2:18:32 AM5/12/11
to AgileWikiDevelopers
Oops! If finally dawned on me that work sharing doesn't balance the load between threads unless carried out to an absurd degree. So it is time to scrap the current implementation and go with work stealing. A tad more complex, but much more efficient.

Nice having this as a back burner project, as it saves a lot of time. E.G. The implementation I'm scrapping wasn't tested yet.

Bill

William la Forge

unread,
May 12, 2011, 7:24:18 AM5/12/11
to AgileWikiDevelopers
Well, I've found a nice solution for sharing work between dispatchers. A dispatch thread now checks before taking (and possibly waiting) the next actor from the queue of pending actors and adds itself to the queue of hungry dispatchers if its pending actor queue is empty. And after taking an actor from its queue of pending actors, it then shares any extra work it has with the hungry dispatchers.

By itself, utilization is not 100%, as a hungry dispatcher thread will wait until a dispatcher with extra work finishes processing the messages queued to the actor it is working on. But when a message is sent to an idle actor we also now give preference to any hungry dispatchers for processing that actor, so overall utilization should be quite reasonable.

Note that the case for a system under load is that work is still allocated round robin, keeping the dispatchers busy and avoiding thread switching. It is only when the load becomes unbalanced that a thread needs to queue up for work. Our speed gain then comes from having a separate queue of pending actors for each dispatcher thread, rather than having a pool of threads contending for work from a single queue.

Very much looking forward to some timing tests for all this.

Bill

Raoul Duke

unread,
May 12, 2011, 3:27:53 PM5/12/11
to agilewiki...@googlegroups.com

William la Forge

unread,
May 13, 2011, 2:12:21 AM5/13/11
to agilewiki...@googlegroups.com
A nice article, and actors is the best way to achieve this. (Using threads and locks directly can be very difficult and is very error prone.) The problem is that the implementation of actors is typically too general, as it often supports actors which block for I/O.

The best way is to have different kinds of actors. Actors which do not block can use an implementation which is optimized for them, which is my plan.

Now what I need next is to determine how much faster the implementation can be when optimized for non-blocking actors--which is what I need to do next. For unless there is a noticeable improvement, this argument is just so much smoke.

Bill

On Fri, May 13, 2011 at 12:57 AM, Raoul Duke <rao...@gmail.com> wrote:
http://www.1024cores.net/home/scalable-architecture/general-recipe

:-)

--
You received this message because you are subscribed to the Google Groups "AgileWikiDevelopers" group.
To post to this group, send email to agilewiki...@googlegroups.com.
To unsubscribe from this group, send email to agilewikidevelo...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/agilewikidevelopers?hl=en.


William la Forge

unread,
May 13, 2011, 2:23:11 AM5/13/11
to agilewiki...@googlegroups.com
See http://www.1024cores.net/home/scalable-architecture/task-scheduling-strategies

Note that of the strategies listed, I employ work distribution. But I also employ work sharing to balance the load. Work sharing is the inverse of the work requesting strategy. (Work sharing is, I guess, my own innovation.)

The idea behind work sharing is that idle threads queue up for work and busy threads share some of their work with threads on the queue. The advantage to work sharing over work requesting is that idle threads don't need to choose which thread to request work from. The disadvantage is that under load, all threads are checking to see if the queue of idle threads is empty, but that's pretty fast. But note also that idle threads don't always map to idle hardware threads, so it is important that idle threads not do any extra work!

Bill
Reply all
Reply to author
Forward
0 new messages