Account Options

  1. Sign in
The old Google Groups will be going away soon, but your browser is incompatible with the new version.
Google Groups Home
« Groups Home
Using messages with different priorities over event stream in ActorSystem
There are currently too many topics in this group that display first. To make this topic appear first, remove this option from another topic.
There was an error processing your request. Please try again.
flag
  5 messages - Collapse all  -  Translate all to Translated (View all originals)
The group you are posting to is a Usenet group. Messages posted to this group will make your email address visible to anyone on the Internet.
Your reply message has not been sent.
Your post was successful
 
From:
To:
Cc:
Followup To:
Add Cc | Add Followup-to | Edit Subject
Subject:
Validation:
For verification purposes please type the characters you see in the picture below or the numbers you hear by clicking the accessibility icon. Listen and type the numbers you hear
 
Eugene Dzhurinsky  
View profile  
 More options Aug 30 2012, 4:14 pm
From: Eugene Dzhurinsky <jdeve...@gmail.com>
Date: Thu, 30 Aug 2012 23:14:08 +0300
Local: Thurs, Aug 30 2012 4:14 pm
Subject: Using messages with different priorities over event stream in ActorSystem

Hi all!

I need to publish messages of different types to  event stream, and those
messages should have different priorities for example, if 10 messages of type
A have been posted, and one message of type B is posted after all, and
priority of B is higher than the priority of A - message B should be picked up
by next actor even if there are 10 messages of type A in queue.

I have read about prioritized messages
(http://doc.akka.io/docs/akka/2.0.3/scala/dispatchers.html#Mailboxes) and
created my simple implementation of that mailbox:

class PrioritizedMailbox(settings: Settings, cfg: Config) extends UnboundedPriorityMailbox(

  PriorityGenerator {
    case ServerPermanentlyDead => println("Priority:0"); 0
    case ServerDead => println("Priority:1"); 1
    case _ => println("Default priority"); 10
  }

)

then I configured it in application.conf

akka {

    actor {

        prio-dispatcher {
            type = "Dispatcher"
            mailbox-type = "animatron.router.actor.mailbox.PrioritizedMailbox"
        }

    }

}

and wired into my actor:

  private val myActor = actors.actorOf(
    Props[MyEventHandler[T]].
      withRouter(RoundRobinRouter(HIVE)).
      withDispatcher("akka.actor.prio-dispatcher").
      withCreator(
      new Creator[Actor] {
        def create() = new MyEventHandler(storage)
      }), name = "eventHandler")

I use ActorSystem.eventStream.publish in order to send messages, and my actor
is subscribed to it (I can see in logs that messages are processed, but in
FIFO order).

However looks like it is not enough, because in logs/console I never seen the
messages like "Default priority". Am I missing something here? Does the
described approach work with event streams or just with direct invocations of
sending a message on actor? And how do I get prioritized messages with
eventStream?

--
Eugene N Dzhurinsky

  application_pgp-signature_part
< 1K Download

 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
√iktor Ҡlang  
View profile  
 More options Aug 30 2012, 4:28 pm
From: √iktor Ҡlang <viktor.kl...@gmail.com>
Date: Thu, 30 Aug 2012 22:28:33 +0200
Local: Thurs, Aug 30 2012 4:28 pm
Subject: Re: [akka-user] Using messages with different priorities over event stream in ActorSystem

Hi Eugene,

It's already been answered here:
http://stackoverflow.com/questions/12183645/scala-akka-multiple-event...

On Thu, Aug 30, 2012 at 10:14 PM, Eugene Dzhurinsky <jdeve...@gmail.com>wrote:

It's only the mailbox that is prioritized, which means that if you process
messages really fast, then they will never stay in the mailbox long enough
to be prioritized relative to other entries in the mailbox.

Cheers,

> --
> Eugene N Dzhurinsky

--
Viktor Klang

Akka Tech Lead
Typesafe <http://www.typesafe.com/> - The software stack for applications
that scale

Twitter: @viktorklang


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Eugene Dzhurinsky  
View profile  
 More options Aug 30 2012, 4:43 pm
From: Eugene Dzhurinsky <jdeve...@gmail.com>
Date: Thu, 30 Aug 2012 23:42:31 +0300
Local: Thurs, Aug 30 2012 4:42 pm
Subject: Re: [akka-user] Using messages with different priorities over event stream in ActorSystem

On Thu, Aug 30, 2012 at 10:28:33PM +0200, √iktor Ҡlang wrote:
>    Hi Eugene,

>    It's already been answered
>    here: http://stackoverflow.com/questions/12183645/scala-akka-multiple-e
>    vent-buses-for-actorsystem-or-having-prioritized-events

Yes, but for some reason the proposed solution doesn't work. I created the
mailbox with priority generation, and added some debug messages to it, but
looks like it is not invoked at all.

--
Eugene N Dzhurinsky

  application_pgp-signature_part
< 1K Download

 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
√iktor Ҡlang  
View profile  
 More options Aug 30 2012, 5:27 pm
From: √iktor Ҡlang <viktor.kl...@gmail.com>
Date: Thu, 30 Aug 2012 23:27:03 +0200
Local: Thurs, Aug 30 2012 5:27 pm
Subject: Re: [akka-user] Using messages with different priorities over event stream in ActorSystem

Works for me:

  trait Foo
  case object X extends Foo
  case object Y extends Foo
  case object Z extends Foo

  class PrioritizedMailbox(settings: ActorSystem.Settings, cfg: Config)
extends UnboundedPriorityMailbox(
    PriorityGenerator {
      case X ⇒ 0
      case Y ⇒ 1
      case Z ⇒ 2
      case _ ⇒ 10
    })

val s = ActorSystem("prio", com.typesafe.config.ConfigFactory.parseString(
        """ prio-dispatcher {
        type = "Dispatcher"
          mailbox-type = "%s"
        }""".format(classOf[PrioritizedMailbox].getName)))
      val latch = new java.util.concurrent.CountDownLatch(1)
      val a = s.actorOf(Props(new akka.actor.Actor {
        latch.await // Just wait here so that the messages are queued up
inside the mailbox
        def receive = {
          case any ⇒ /*println("Processing: " + any);*/ sender ! any
        }
      }).withDispatcher("prio-dispatcher"))
      implicit val sender = testActor
      a ! "pig"
      a ! Y
      a ! Z
      a ! Y
      a ! X
      a ! Z
      a ! X
      a ! "dog"

      latch.countDown()

      val expectedSequence = Seq(X, X, Y, Y, Z, Z, "pig", "dog")
      expectedSequence foreach { x => expectMsg(x) }
      s.shutdown()

This test passes with flying colors, so either you have something wrong in
your config/router, always try to remove as many moving parts as possible
when you are debugging, i.e why are you using a router in your example
instead of just a plain actor?

Cheers,

On Thu, Aug 30, 2012 at 10:42 PM, Eugene Dzhurinsky <jdeve...@gmail.com>wrote:

> On Thu, Aug 30, 2012 at 10:28:33PM +0200, √iktor Ҡlang wrote:
> >    Hi Eugene,

> >    It's already been answered
> >    here:
> http://stackoverflow.com/questions/12183645/scala-akka-multiple-e
> >    vent-buses-for-actorsystem-or-having-prioritized-events

> Yes, but for some reason the proposed solution doesn't work. I created the
> mailbox with priority generation, and added some debug messages to it, but
> looks like it is not invoked at all.

> --
> Eugene N Dzhurinsky

--
Viktor Klang

Akka Tech Lead
Typesafe <http://www.typesafe.com/> - The software stack for applications
that scale

Twitter: @viktorklang


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Eugene Dzhurinsky  
View profile  
 More options Aug 31 2012, 5:33 am
From: Eugene Dzhurinsky <jdeve...@gmail.com>
Date: Fri, 31 Aug 2012 12:33:37 +0300
Local: Fri, Aug 31 2012 5:33 am
Subject: Re: [akka-user] Using messages with different priorities over event stream in ActorSystem

On Thu, Aug 30, 2012 at 11:27:03PM +0200, √iktor Ҡlang wrote:
>    This test passes with flying colors, so either you have something wrong
>    in your config/router, always try to remove as many moving parts as
>    possible when you are debugging, i.e why are you using a router in your
>    example instead of just a plain actor?

You were right, I created test like yours - but with EventStream, and realized
that my mailbox is not invoked, because all actors are picking up messages as
soon as they appear, so no reordering required. Then I updated number of
actors in RoundRobinRouter to 1 - and got messages prioritized.

Thanks for explanations!
--
Eugene N Dzhurinsky

  application_pgp-signature_part
< 1K Download

 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
End of messages
« Back to Discussions « Newer topic     Older topic »