Sending a message from an Akka actor at Fixed interval

420 views
Skip to first unread message

Soumya Simanta

unread,
Apr 8, 2014, 11:49:16 PM4/8/14
to akka...@googlegroups.com
NOTE: Cross posting it here for better coverage. 

I want to publish messages from an Actor at a fixed rate. I extended 
this example below where the current time is published a Redis channel every 2 seconds. In the SubscribeActor I'm keeping track of some state (in this case the aggregate). I want to publish the value of this aggregate to another Redis channel at fixed intervals (say every 30 seconds)

import akka.actor.Props
import java.net.InetSocketAddress
import redis.RedisClient
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import redis.api.pubsub.{PMessage, Message}

import redis.actors.RedisSubscriberActor


object AkkaScheduler extends App {

  implicit val akkaSystem = akka.actor.ActorSystem()

  val redis = RedisClient()

  //publish the current time every 2 seconds on Redis channel "time"
  akkaSystem.scheduler.schedule(2 seconds, 2 seconds)(redis.publish("time", System.currentTimeMillis()))

  //channel and patterns to subscribe to
  val channels = Seq("time")
  val patterns = Seq("pattern.*")
  // create SubscribeActor instance
  akkaSystem.actorOf(Props(classOf[SubscribeActor], "input", channels, patterns).withDispatcher("rediscala.rediscala-client-worker-dispatcher"))

}


class SubscribeActor(name: String, channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
  extends RedisSubscriberActor(new InetSocketAddress("localhost", 6379), channels, patterns) {

  //hold state till it's time to publish it again
  var aggregate: BigInt = 0

  def onMessage(message: Message) {
    println(s" $name message received: $message")
    aggregate = aggregate + BigInt(message.data.toInt)
    //Publish this aggregate to another Redis channel at a fixed interval (e.g., every 30 seconds)
  }

  def onPMessage(pmessage: PMessage) {
    println(s"pattern message received: $pmessage")
  }
}

Konrad Malawski

unread,
Apr 9, 2014, 4:14:37 AM4/9/14
to Akka User List

Hello Soumya,
You can use the scheduler the same way you’re already doing it from within an Actor too.

// inside an Actor (Subscribe Actor)
context.system.scheduler.schedule(30.seconds, 30.seconds) {
  // your publishing here
}

I hope this helps, happy hakking!


-- 
Cheers,
Konrad Malawski


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Patrik Nordwall

unread,
Apr 9, 2014, 6:29:24 AM4/9/14
to akka...@googlegroups.com
On Wed, Apr 9, 2014 at 10:14 AM, Konrad Malawski <konrad....@project13.pl> wrote:

Hello Soumya,
You can use the scheduler the same way you’re already doing it from within an Actor too.

Inside an actor you should always schedule a message to self instead of running a block of code. The reason is to avoid closing over things in the actor and thereby access things in the actor from another thread.

More information about scheduling messages can be found here: http://doc.akka.io/docs/akka/2.3.1/scala/howto.html#scheduling-periodic-messages

Cheers,
Patrik



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Soumya Simanta

unread,
Apr 9, 2014, 10:52:41 PM4/9/14
to akka...@googlegroups.com
Patrik and Konrad, 

Thanks a lot for your help. I'm still learning Akka so please pardon any mistake that may seem really simple to you all. 

Here is what my current code looks like. In the code below in my sending a message (AggregateValue) to the SubscribeActor from outside the actor. Since SubscribeActor extends  RedisSubscriberActor I want a way to override the receive method. I'm using chaining approach described here.  However, it's not working and I don't know why. My understanding is that the AggregateValue message will be send every 10 seconds. However, as the logs below show the message is only sent once. Please see line 
[INFO] [04/09/2014 17:01:48.329] [default-rediscala.rediscala-client-worker-dispatcher-6] [akka://default/user/$a] sending an output message ....0

Any help will be greatly appreciated. 


import akka.actor.{Props, ActorRef, ActorLogging}
import java.net.InetSocketAddress
import redis.RedisClient
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import redis.api.pubsub.{PMessage, Message}

import redis.actors.RedisSubscriberActor


object AkkaScheduler extends App {

  implicit val akkaSystem = akka.actor.ActorSystem()

  val redis = RedisClient()

  //publish the current time every 2 seconds on Redis channel "time"
  akkaSystem.scheduler.schedule(2 seconds, 2 seconds)(redis.publish("time", System.currentTimeMillis()))


  //channel and patterns to subscribe to
  val channels = Seq("time")
  val patterns = Seq("pattern.*")
  // create SubscribeActor instance
  val act = akkaSystem.actorOf(Props(classOf[SubscribeActor], "input", channels, patterns).withDispatcher("rediscala.rediscala-client-worker-dispatcher"))

  //send an AggregateValue message to the SubscribeActor
  akkaSystem.scheduler.schedule(0 seconds, 10 seconds, act, AggregateValue)

  //message send to aggregate value
  case object AggregateValue


  class SubscribeActor(name: String, channels: Seq[String] = Nil, patterns: Seq[String] = Nil)
    extends RedisSubscriberActor(new InetSocketAddress("localhost", 6379), channels, patterns) with ActorLogging {


    //hold state till it's time to publish it again
    var aggregate: Int = 0

    var redis = RedisClient()

    //Publish this aggregate to another Redis channel at a fixed interval (e.g., every 30 seconds)
    def sendOutputMessage = {
      log.info("sending an output message ....%d".format(aggregate))
      redis.publish("outputchannel", aggregate )
      //reset aggregate
      aggregate = 0;
    }


    //define a function to chain with super.receive
    def f : PartialFunction[Any, Unit]  = {
      case AggregateValue => {
        sendOutputMessage
      }
    }

    //override the super class receive method  redis.actors.RedisSubscriberActor
    override def receive : Receive = f orElse super.receive

    //
    override def onMessage(message: Message) {
      log.info(s" $name message received: $message, $aggregate")
      aggregate = aggregate + 1
    }

    def onPMessage(pmessage: PMessage) {
      println(s"pattern message received: $pmessage")
    }
  }


}


OUTPUT LOGS 

[INFO] [04/09/2014 17:01:48.320] [default-rediscala.rediscala-client-worker-dispatcher-7] [akka://default/user/RedisClient-$b] Connect to localhost/127.0.0.1:6379
[INFO] [04/09/2014 17:01:48.320] [default-rediscala.rediscala-client-worker-dispatcher-6] [akka://default/user/$a] Connect to localhost/127.0.0.1:6379
[INFO] [04/09/2014 17:01:48.320] [default-rediscala.rediscala-client-worker-dispatcher-5] [akka://default/user/RedisClient-$a] Connect to localhost/127.0.0.1:6379
[INFO] [04/09/2014 17:01:48.329] [default-rediscala.rediscala-client-worker-dispatcher-6] [akka://default/user/$a] sending an output message ....0
[INFO] [04/09/2014 17:01:48.354] [default-rediscala.rediscala-client-worker-dispatcher-6] [akka://default/user/$a] Connected to localhost/127.0.0.1:6379
[INFO] [04/09/2014 17:01:48.356] [default-rediscala.rediscala-client-worker-dispatcher-7] [akka://default/user/RedisClient-$a] Connected to localhost/127.0.0.1:6379
[INFO] [04/09/2014 17:01:48.356] [default-rediscala.rediscala-client-worker-dispatcher-5] [akka://default/user/RedisClient-$b] Connected to localhost/127.0.0.1:6379
[INFO] [04/09/2014 17:01:48.597] [default-rediscala.rediscala-client-worker-dispatcher-5] [akka://default/user/$a]  input message received: Message(time,1397077308594), 0
[INFO] [04/09/2014 17:01:50.316] [default-rediscala.rediscala-client-worker-dispatcher-6] [akka://default/user/$a]  input message received: Message(time,1397077310315), 1
[INFO] [04/09/2014 17:01:50.595] [default-rediscala.rediscala-client-worker-dispatcher-6] [akka://default/user/$a]  input message received: Message(time,1397077310594), 2
[INFO] [04/09/2014 17:01:52.316] [default-rediscala.rediscala-client-worker-dispatcher-7] [akka://default/user/$a]  input message received: Message(time,1397077312315), 3
[INFO] [04/09/2014 17:01:52.596] [default-rediscala.rediscala-client-worker-dispatcher-7] [akka://default/user/$a]  input message received: Message(time,1397077312595), 4
[INFO] [04/09/2014 17:01:54.317] [default-rediscala.rediscala-client-worker-dispatcher-5] [akka://default/user/$a]  input message received: Message(time,1397077314315), 5
[INFO] [04/09/2014 17:01:54.596] [default-rediscala.rediscala-client-worker-dispatcher-5] [akka://default/user/$a]  input message received: Message(time,1397077314595), 6
[INFO] [04/09/2014 17:01:56.316] [default-rediscala.rediscala-client-worker-dispatcher-6] [akka://default/user/$a]  input message received: Message(time,1397077316315), 7
[INFO] [04/09/2014 17:01:56.595] [default-rediscala.rediscala-client-worker-dispatcher-6] [akka://default/user/$a]  input message received: Message(time,1397077316594), 8
[INFO] [04/09/2014 17:01:58.316] [default-rediscala.rediscala-client-worker-dispatcher-6] [akka://default/user/$a]  input message received: Message(time,1397077318314), 9 

Martynas Mickevičius

unread,
Apr 11, 2014, 9:28:34 AM4/11/14
to akka...@googlegroups.com
Hi Soumya,

What is probably happening here is that the code in RedisWorkerIO class which you inherit from changes your actor behavior stack by using context.become(...). In that case all the messages scheduler is sending to your actor are just not handled.

To see unhandled messages in the logs add following settings to your config file:

akka {
  loglevel = "DEBUG"
  actor.debug.unhandled = on
}
Martynas Mickevičius
TypesafeReactive Apps on the JVM

Soumya Simanta

unread,
Apr 11, 2014, 3:42:43 PM4/11/14
to akka...@googlegroups.com
Martynas, 

You are correct. This is exactly what was happening. In fact, I was going to post the solution but you posted it before I could. 

Here is a very simple standalone example that explains the behavior.  In the example below ComplexActor needs to override both methods that return a Receive in the super class hierarchy for it to work. 


import akka.actor.Actor.Receive
import akka.actor.{Props, ActorLogging}
import akka.event.Logging
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor.Actor

case object SimpleActorMessageA
case object SimpleActorMessageB


abstract class SimpleActor extends Actor with ActorLogging {
  def receive: Receive = {
    case SimpleActorMessageA => log.info("super message A")
    case SimpleActorMessageB => { log.info("super message B")
      context become (mutate) }
  }

  def mutate: Receive = {
    case SimpleActorMessageA => log.info("mutate message A")
    case SimpleActorMessageB => log.info("mutate message B")

  }
}

case object ComplexActorMessageC

class ComplexActor extends SimpleActor  {


  def pf : PartialFunction[Any, Unit]  = {
    case ComplexActorMessageC => {
      log.info("complex message C")
    }
  }

  override def receive: Receive = pf orElse super.receive
  override def mutate: Receive = pf orElse super.mutate
}

object MainApp extends App {

  implicit val akkaSystem = akka.actor.ActorSystem()
  val act = akkaSystem.actorOf(Props(classOf[ComplexActor]))

  akkaSystem.scheduler.schedule(0 seconds, 1 seconds, act, SimpleActorMessageA)
  akkaSystem.scheduler.schedule(0.5 seconds, 4 seconds, act, ComplexActorMessageC)
  akkaSystem.scheduler.scheduleOnce(5 seconds, act, SimpleActorMessageB)


}




You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/gIq3gq-8yu0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages