I've been concerned about the dispatch logic. Random isn't threadsafe and even with random, the load will become unbalanced over time. I've replaced Random with a thread local round robin index, and added load sharing. Below is the revised code for dispatch.
Bill
package org.agilewiki
package util
package actors
package nonblocking
import java.util.concurrent.LinkedBlockingDeque
class DispatchThread extends Thread {
private val pendingActors = new LinkedBlockingDeque[NBActorCapability]
override def run {
while(true) {
val nba = pendingActors.takeFirst
if (pendingActors.size > 1) DispatchThread.addNBActor(pendingActors.pollLast)
nba.processMessages
}
}
private[nonblocking] def addNBActor(nba: NBActorCapability) {
pendingActors.putLast(nba)
}
private[nonblocking] def isEmpty = pendingActors.isEmpty
}
object DispatchThread {
private val N = 4
private val dispatchers = new Array[DispatchThread](N)
private val next= new ThreadLocal[Int] {
override protected def initialValue = 0
}
var i = 0
while (i < N) {
val dt = new DispatchThread
dispatchers(i) = dt
dt.start
i = i + 1
}
private[nonblocking] def addNBActor(nba: NBActorCapability) {
val ct = Thread.currentThread
if (ct.isInstanceOf[DispatchThread] && ct.asInstanceOf[DispatchThread].isEmpty)
ct.asInstanceOf[DispatchThread].addNBActor(nba)
else {
val n = (next.get + 1) % N
next.set(n)
dispatchers(n).addNBActor(nba)