I need to publish messages of different types to event stream, and those
messages should have different priorities for example, if 10 messages of type
A have been posted, and one message of type B is posted after all, and
priority of B is higher than the priority of A - message B should be picked up
by next actor even if there are 10 messages of type A in queue.
class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(
PriorityGenerator {
case ServerPermanentlyDead => println("Priority:0"); 0
case ServerDead => println("Priority:1"); 1
case _ => println("Default priority"); 10
}
)
then I configured it in application.conf
akka {
actor {
prio-dispatcher {
type = "Dispatcher"
mailbox-type = "animatron.router.actor.mailbox.PrioritizedMailbox"
}
}
}
and wired into my actor:
private val myActor = actors.actorOf(
Props[MyEventHandler[T]].
withRouter(RoundRobinRouter(HIVE)).
withDispatcher("akka.actor.prio-dispatcher").
withCreator(
new Creator[Actor] {
def create() = new MyEventHandler(storage)
}), name = "eventHandler")
I use ActorSystem.eventStream.publish in order to send messages, and my actor
is subscribed to it (I can see in logs that messages are processed, but in
FIFO order).
However looks like it is not enough, because in logs/console I never seen the
messages like "Default priority". Am I missing something here? Does the
described approach work with event streams or just with direct invocations of
sending a message on actor? And how do I get prioritized messages with
eventStream?
> I need to publish messages of different types to event stream, and those
> messages should have different priorities for example, if 10 messages of
> type
> A have been posted, and one message of type B is posted after all, and
> priority of B is higher than the priority of A - message B should be
> picked up
> by next actor even if there are 10 messages of type A in queue.
> I have read about prioritized messages
> (http://doc.akka.io/docs/akka/2.0.3/scala/dispatchers.html#Mailboxes) and
> created my simple implementation of that mailbox:
> class PrioritizedMailbox(settings: Settings, cfg: Config) extends
> UnboundedPriorityMailbox(
> PriorityGenerator {
> case ServerPermanentlyDead => println("Priority:0"); 0
> case ServerDead => println("Priority:1"); 1
> case _ => println("Default priority"); 10
> }
> private val myActor = actors.actorOf(
> Props[MyEventHandler[T]].
> withRouter(RoundRobinRouter(HIVE)).
> withDispatcher("akka.actor.prio-dispatcher").
> withCreator(
> new Creator[Actor] {
> def create() = new MyEventHandler(storage)
> }), name = "eventHandler")
> I use ActorSystem.eventStream.publish in order to send messages, and my
> actor
> is subscribed to it (I can see in logs that messages are processed, but in
> FIFO order).
> However looks like it is not enough, because in logs/console I never seen
> the
> messages like "Default priority". Am I missing something here? Does the
> described approach work with event streams or just with direct invocations
> of
> sending a message on actor? And how do I get prioritized messages with
> eventStream?
It's only the mailbox that is prioritized, which means that if you process
messages really fast, then they will never stay in the mailbox long enough
to be prioritized relative to other entries in the mailbox.
Cheers,
√
> --
> Eugene N Dzhurinsky
-- Viktor Klang
Akka Tech Lead
Typesafe <http://www.typesafe.com/> - The software stack for applications
that scale
Yes, but for some reason the proposed solution doesn't work. I created the
mailbox with priority generation, and added some debug messages to it, but
looks like it is not invoked at all.
trait Foo
case object X extends Foo
case object Y extends Foo
case object Z extends Foo
class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config)
extends UnboundedPriorityMailbox(
PriorityGenerator {
case X ⇒ 0
case Y ⇒ 1
case Z ⇒ 2
case _ ⇒ 10
})
val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString(
""" prio-dispatcher {
type = "Dispatcher"
mailbox-type = "%s"
}""".format(classOf[PrioritizedMailbox].getName)))
val latch = new java.util.concurrent.CountDownLatch(1)
val a = s.actorOf(Props(new akka.actor.Actor {
latch.await // Just wait here so that the messages are queued up
inside the mailbox
def receive = {
case any ⇒ /*println("Processing: " + any);*/ sender ! any
}
}).withDispatcher("prio-dispatcher"))
implicit val sender = testActor
a ! "pig"
a ! Y
a ! Z
a ! Y
a ! X
a ! Z
a ! X
a ! "dog"
latch.countDown()
val expectedSequence = Seq(X, X, Y, Y, Z, Z, "pig", "dog")
expectedSequence foreach { x => expectMsg(x) }
s.shutdown()
This test passes with flying colors, so either you have something wrong in
your config/router, always try to remove as many moving parts as possible
when you are debugging, i.e why are you using a router in your example
instead of just a plain actor?
Cheers,
√
On Thu, Aug 30, 2012 at 10:42 PM, Eugene Dzhurinsky <jdeve...@gmail.com>wrote:
> On Thu, Aug 30, 2012 at 10:28:33PM +0200, √iktor Ҡlang wrote:
> > Hi Eugene,
> Yes, but for some reason the proposed solution doesn't work. I created the
> mailbox with priority generation, and added some debug messages to it, but
> looks like it is not invoked at all.
> --
> Eugene N Dzhurinsky
-- Viktor Klang
Akka Tech Lead
Typesafe <http://www.typesafe.com/> - The software stack for applications
that scale
On Thu, Aug 30, 2012 at 11:27:03PM +0200, √iktor Ҡlang wrote:
> This test passes with flying colors, so either you have something wrong
> in your config/router, always try to remove as many moving parts as
> possible when you are debugging, i.e why are you using a router in your
> example instead of just a plain actor?
You were right, I created test like yours - but with EventStream, and realized
that my mailbox is not invoked, because all actors are picking up messages as
soon as they appear, so no reordering required. Then I updated number of
actors in RoundRobinRouter to 1 - and got messages prioritized.