Prioritizing messages in mailbox

66 views
Skip to first unread message

Gilles

unread,
Mar 9, 2011, 9:36:55 AM3/9/11
to Akka User List
Dear hakkars,

Do you know whether there is a way to prioritize the mailboxes of
actors, such that certain types of messages gets treated as soon as
they arrive?
Could a custom comparator of some kind be supplied to the prioritized
message queues?

Thanks for your help!

Gilles.

√iktor Klang

unread,
Mar 9, 2011, 9:43:17 AM3/9/11
to akka...@googlegroups.com
Dear Gilles,

you can implement this yourself by specifying that your ExecutorBasedDispatcher should use a ThreadPoolConfig with a queueFactory that returns a PriorityBlockingQueue

Happy hAkking!

Cheers,


--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.




--
Viktor Klang,
Code Connoisseur
Work:   Scalable Solutions
Code:   github.com/viktorklang
Follow: twitter.com/viktorklang
Read:   klangism.tumblr.com

√iktor Klang

unread,
Mar 9, 2011, 9:56:05 AM3/9/11
to akka...@googlegroups.com
Sorry, my bad, thought about the priority of scheduling actors in the dispatcher, not the priority of messages.

Let me ponder this and get back to you.

Cheers,

√iktor Klang

unread,
Mar 9, 2011, 10:27:18 AM3/9/11
to akka...@googlegroups.com
Hey Gilles,

You'll have to implement some plumbing yourself, here's a quick draft, see if you can bend it to your needs:

class UnboundedPriorityMessageQueue(blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
      PriorityBlockingQueue[MessageInvocation](11, cmp) with MessageQueue {
    final def enqueue(handle: MessageInvocation) {
      this add handle
    }

    final def dequeue(): MessageInvocation = {
      if (blockDequeue) this.take()
      else this.poll()
    }
  }

  class BoundedPriorityMessageQueue(capacity: Int, pushTimeOut: Duration, blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
        PriorityBlockingQueue[MessageInvocation](capacity, cmp) with MessageQueue{
    final def enqueue(handle: MessageInvocation) {
      if (pushTimeOut.toMillis > 0) {
        if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit))
          throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
      } else this put handle
    }

    final def dequeue(): MessageInvocation =
      if (blockDequeue) this.take()
      else this.poll()
  }

  class PriorityExecutorBasedEventDrivenDispatcher(
    name: String,
    comparator: Comparator[MessageInvocation],
    throughput: Int = Dispatchers.THROUGHPUT,
    throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
    mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
    config: ThreadPoolConfig = ThreadPoolConfig()
    ) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) {

    override def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
      case UnboundedMailbox(blocking) => new UnboundedPriorityMessageQueue(blocking, comparator) with ExecutableMailbox {
        def dispatcher = PriorityExecutorBasedEventDrivenDispatcher.this
      }

      case BoundedMailbox(blocking, capacity, pushTimeOut) =>
        new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) with ExecutableMailbox {
          def dispatcher = PriorityExecutorBasedEventDrivenDispatcher.this
        }
    }
  }


usage:

object PriorityMessageDispatcher {
  val comparator = new Comparator[MessageInvocation] ...
  val instance = new PriorityExecutorBasedEventDrivenDispatcher("PriorityMessageDispatcher",comparator)
}

val a = actorOf(...)
a.dispatcher = PriorityMessageDispatcher.instance
a.start

Does that help?

Cheers,

Jason Zaugg

unread,
Mar 14, 2011, 5:57:17 AM3/14/11
to akka...@googlegroups.com
On Wed, Mar 9, 2011 at 4:27 PM, √iktor Klang <viktor...@gmail.com> wrote:
> You'll have to implement some plumbing yourself, here's a quick draft, see
> if you can bend it to your needs:
[snip]

> val a = actorOf(...)
> a.dispatcher = PriorityMessageDispatcher.instance
> a.start

+1 for something like this rolled into the toolkit.

My usecase for this is using a set of actors to run a monte carlo
simulation, sending results to an aggregator. Each sample is modelled
as a message.

The aggregator can stop the simulation early (after mean has converged
within a tolerence), and would send a Stop message to the workers.
I'm just using an AtomicBoolean at the moment.

Incidentally, the aggregator actor would benefit from dispatcher level
prioritization, so it doesn't fall behind.

Actually, I'm currently using scalaz.concurrent.Actor for this, but a
few more features like this and I might be tempted to jump ship ;)

-jason

√iktor Klang

unread,
Mar 14, 2011, 6:20:50 AM3/14/11
to akka...@googlegroups.com

Already in master, see PriorityExecutorBasedEventDrivenDispatcher, you can specify your own Comparator for the priority.

You can also do: new -ebedd-dispatcher-type- with PriorityMailbox and then in the class body define: val comparator ...

Enjoy

On Mar 14, 2011 10:57 AM, "Jason Zaugg" <jza...@gmail.com> wrote:

On Wed, Mar 9, 2011 at 4:27 PM, √iktor Klang <viktor...@gmail.com> wrote:

> You'll have to implem...

[snip]

> val a = actorOf(...)
> a.dispatcher = PriorityMessageDispatcher.instance
> a.start

+1 for something like this rolled into the toolkit.

My usecase for this is using a set of actors to run a monte carlo
simulation, sending results to an aggregator. Each sample is modelled
as a message.

The aggregator can stop the simulation early (after mean has converged
within a tolerence), and would send a Stop message to the workers.
I'm just using an AtomicBoolean at the moment.

Incidentally, the aggregator actor would benefit from dispatcher level
prioritization, so it doesn't fall behind.

Actually, I'm currently using scalaz.concurrent.Actor for this, but a
few more features like this and I might be tempted to jump ship ;)

-jason


--

You received this message because you are subscribed to the Google Groups "Akka User List" group.

To...

Reply all
Reply to author
Forward
0 new messages