is it a good way to initialize Actor?

74 views
Skip to first unread message

Harit Himanshu

unread,
May 27, 2015, 5:48:18 PM5/27/15
to akka...@googlegroups.com
I need to run a static method on a Java class (provided as a jar dependency).  
I also wanted to be able to write test on that. I created a wrapper class in Scala that will call this method. It looks like  

object CurrentLogProcessor {
}

class CurrentLogProcessor {
  def run: Unit = LogReaderDisruptor.main(Array())
}


case object LogProcessRequest

object LProcessor {
  def props(currentLogProcessor: CurrentLogProcessor) = Props(new LProcessor(currentLogProcessor))
}

class LProcessor(currentLogProcessor: CurrentLogProcessor) extends Actor {
  val log = Logging(context.system, this)

  def receive = {
    case LogProcessRequest => log.debug("starting log processing")
      currentLogProcessor run
  }
}

and in my App I call this actor as  

  val logProcessor = system.actorOf(LProcessor.props(new CurrentLogProcessor), "logProcessor")
  logProcessor ! LogProcessRequest


This compiles and works fine. Also, I am planning to mock(CurrentLogProcessor) so that I may be able to test it.  

Does this approach looks good? I am new to akka so wanted to make sure I am not making mistakes here

Thank you

Konrad Malawski

unread,
May 29, 2015, 12:01:24 PM5/29/15
to Akka User List
When testing Actors one should rather test the behaviours (the messages sent to and from an Actor) instead of mocking.

Please note that since mockito (great tool) holds values in thread locals and may not work as you'd expect it to with multithreaded code. Unless that has changed I'd recommend against mixing mockito with Actor tests.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,
Konrad 'ktoso' Malawski

Harit Himanshu

unread,
May 29, 2015, 12:24:14 PM5/29/15
to akka...@googlegroups.com
Thanks Konrad

As mentioned

 def run: Unit = LogReaderDisruptor.main(Array())

is a method that is supposed to run forever, plus it required some setup(that is available on client's machine or test environment), it was not trivial to set it up for Unit Test, which is why I mocked that part out

How would you recommend such a method otherwise?

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/_qV5b2zpx74/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Konrad Malawski

unread,
May 29, 2015, 12:38:42 PM5/29/15
to Akka User List
 def run: Unit = LogReaderDisruptor.main(Array())
is a method that is supposed to run forever, plus it required some setup (that is available on client's machine or test environment),
By that do you mean that the main() is never returning? If so, then you're blocking the Actor and wasting an entire thread.
Spawn this off on a dedicated dispatcher instead (see dispatchers and futures docs).
 
it was not trivial to set it up for Unit Test, which is why I mocked that part out
The overall technique is very good, I'm just reserved on using mocking tools with concurrent code AFAIR this yields suprising results sometimes.

How would you recommend such a method otherwise?
Extract an interface (trait) and in tests provide a NoopImplementation instance of it.

-- Konrad 

Harit Himanshu

unread,
May 29, 2015, 12:47:35 PM5/29/15
to akka...@googlegroups.com
By that do you mean that the main() is never returning? If so, then you're blocking the Actor and wasting an entire thread.
Spawn this off on a dedicated dispatcher instead (see dispatchers and futures docs).

Yes, this is suppose to run always, and even if it fails, it needs to restart, which is why I was thinking to assign 1 actor to do this job forever. (Pardon my understanding since I am very new to Akka and don't know how to do this properly).
I will read up for how to spawn it on a dedicated dispatcher, but my question is if it would fail, how would the supervisor know and restart it?  
Any ideas?

How would you recommend such a method otherwise?
Extract an interface (trait) and in tests provide a NoopImplementation instance of it.

- Regarding Mocking, I would consider your advice and try to stay away from it, I need to learn how to use your advice (I am new to this entire ecosystem)

--

Harit Himanshu

unread,
May 30, 2015, 12:46:29 AM5/30/15
to akka...@googlegroups.com, scala...@googlegroups.com
Considering the following Scenario  

CoachSupervisor
            |
MarathonRunnerActor

Does that mean following?

  • MarathonRunnerActor will start a new future by giving a custom dispatcher of type PinnedDispatcher?
  • If so, then in case of any failure how would CoachSupervisor be notified? He is monitoring MarathonRunnerActor and not future (future is not Actor)
I am confused

Harit Himanshu

unread,
May 30, 2015, 6:06:44 PM5/30/15
to akka...@googlegroups.com, scala...@googlegroups.com
I tried to model this problem and build out a structure where a long-running work is given to dispatcher (I believe it should be a separate dispatcher than to use a global one). But in this case, the supervisor never gets the failure message and never applies strategy on it.  Here is my code  

package com.learner.ahka.runforever

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object RaceEvent extends App {
  val config = ConfigFactory.parseString( """
    akka.loglevel = "DEBUG"
    akka.actor.debug {
      receive = on
      lifecycle = on
    }
                                          """)
  val system = ActorSystem.create("race", config)
  val coach = system.actorOf(Coach.props(), "coach")
  coach ! GetSetGo
}


package com.learner.ahka.runforever

import akka.actor.SupervisorStrategy.Restart
import akka.actor._
import akka.event.LoggingReceive

import scala.concurrent.duration._

case object GetSetGo

object Coach {
  def props(): Props = Props[Coach];
}

class Coach() extends Actor with ActorLogging {

  val runner = context.actorOf(Runner.props(new Marathon), "runner")

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 5 seconds) {
    case _: RuntimeException => Restart
  }

  override def receive = LoggingReceive {
    case GetSetGo => runner ! GoForIt
  }
}


package com.learner.ahka.runforever

import akka.actor.{Actor, ActorLogging, Props}
import akka.event.LoggingReceive
import scala.concurrent.ExecutionContext.Implicits.global

object Runner {
  def props(race: Race) = Props(classOf[Runner], race)
}

class Runner(race: Race) extends Actor with ActorLogging {

  override def receive: Receive = LoggingReceive {
    case GoForIt => {
      race.start onFailure {
        case e: RuntimeException => throw e
      }
    }
  }
}


package com.learner.ahka.runforever

import scala.concurrent.Future

case object GoForIt

trait Race {
  def start: Future[Any]
}

class Marathon extends Race {

  import scala.concurrent.ExecutionContext.Implicits.global

  override def start: Future[Any] = future

  val future = Future {
    for (i <- 1 to 3) {
      println("I am a Marathon Runner!")
      Thread.sleep(1000)
    }
    throw new RuntimeException("MarathonRunner is tired")
  }
}

Can someone help me explain what am I doing wrong here?

The logs that I see are  

[DEBUG] [05/30/2015 15:02:01.364] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 15:02:01.365] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [05/30/2015 15:02:01.370] [race-akka.actor.default-dispatcher-3] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#-325855940]
[DEBUG] [05/30/2015 15:02:01.371] [race-akka.actor.default-dispatcher-2] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@171fdafb)
[DEBUG] [05/30/2015 15:02:01.375] [race-akka.actor.default-dispatcher-4] [akka://race/user] now supervising Actor[akka://race/user/coach#1377762623]
I am a Marathon Runner!
[DEBUG] [05/30/2015 15:02:01.388] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] started (com.learner.ahka.runforever.Coach@71538e2c)
[DEBUG] [05/30/2015 15:02:01.388] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] started (com.learner.ahka.runforever.Runner@1571c2a0)
[DEBUG] [05/30/2015 15:02:01.388] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#1938289506]
[DEBUG] [05/30/2015 15:02:01.389] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] received handled message GetSetGo
[DEBUG] [05/30/2015 15:02:01.390] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message GoForIt
I am a Marathon Runner!
I am a Marathon Runner!
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thank you very much

Harit Himanshu

unread,
May 31, 2015, 5:45:10 PM5/31/15
to scala...@googlegroups.com, akka...@googlegroups.com
I realized that the problem is with executing Future restarting Actor.  

I made another question in the spirit of isolating problems and is described in full detail here

Thank you
+ Harit Himanshu
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.

Konrad Malawski

unread,
May 31, 2015, 5:49:21 PM5/31/15
to Akka User List, scala...@googlegroups.com
Hi Harit,
A few things to explain here.

1) Ok, so that process must be kept alive and restarted, got it. 
Then you can run that Actor on a dedicated PinnedDIspatcher (check the docs on how to pick a dispatcher for an Actor).

2) the onFailure { throw } won't work as you expect it to, since it's not called within the actors thread, but asynchronously. Thus it will not trigger the supervision - since now I know you want to keep that process alive and restart, see 1) and
make the Actor own one thread and then if it fails it will properly throw an exception in the Actor, and cause supervision to kick in.

Hope this helps.
I'd again recommend having a look at our proof-of-concent support offer, we could get really in depth and help you out much more then.

You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Harit Himanshu

unread,
May 31, 2015, 6:00:25 PM5/31/15
to akka...@googlegroups.com, scala...@googlegroups.com
This is so awesome and wonderful to have you talk about that. I guess its all about the right timing. I was thinking exactly about this before my sleep last night.

I hacked again this morning as a separate problem where Runner (the legacy process in my migration application runs forever) runs on a PinnedDispatcher (configured in supervisor, Coach) and ran the application and asserted in logs (will write the tests next) that it works

I hosted this code on Github (as part of learning Scala/Akka) in hope that it may help others to learn such wonderful language and toolkit.

Question
- If I assign one thread(as PinnedDispatcher) and want it to run forever, do you still see any performance issues. I guess this model suits to my requirement until I tear apart the big giant application into smaller components, thoughts?

Thanks again

[DEBUG] [05/31/2015 14:56:13.794] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/31/2015 14:56:13.800] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [05/31/2015 14:56:13.814] [race-akka.actor.default-dispatcher-4] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#1522932566]
[DEBUG] [05/31/2015 14:56:13.815] [race-akka.actor.default-dispatcher-2] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@3b04731d)
[DEBUG] [05/31/2015 14:56:13.826] [race-akka.actor.default-dispatcher-3] [akka://race/user] now supervising Actor[akka://race/user/coach#400280069]
[DEBUG] [05/31/2015 14:56:13.841] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] started (com.learner.ahka.ruforever.Runner@3efffee2)
[DEBUG] [05/31/2015 14:56:13.841] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] started (com.learner.ahka.ruforever.Coach@3b7b2aa3)
[DEBUG] [05/31/2015 14:56:13.842] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#20433449]
[DEBUG] [05/31/2015 14:56:13.842] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] received handled message StartWork
running long job
[DEBUG] [05/31/2015 14:56:13.844] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] received handled message Start
[ERROR] [05/31/2015 14:56:14.466] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.ruforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:25)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.learner.ahka.ruforever.Runner.aroundReceive(Runner.scala:12)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

[DEBUG] [05/31/2015 14:56:14.468] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] restarting
runner restarted, sending message to Run
[DEBUG] [05/31/2015 14:56:14.473] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] restarted
[DEBUG] [05/31/2015 14:56:14.473] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] received handled message RestartRunner
[DEBUG] [05/31/2015 14:56:14.474] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] received handled message StartWork
running long job
[DEBUG] [05/31/2015 14:56:14.475] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] received handled message Start
runner restarted, sending message to Run
[ERROR] [05/31/2015 14:56:15.081] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.ruforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:25)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.learner.ahka.ruforever.Runner.aroundReceive(Runner.scala:12)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

running long job
[DEBUG] [05/31/2015 14:56:15.082] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] restarting
[DEBUG] [05/31/2015 14:56:15.082] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] restarted
[DEBUG] [05/31/2015 14:56:15.082] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] received handled message RestartRunner
[DEBUG] [05/31/2015 14:56:15.082] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] received handled message StartWork
[DEBUG] [05/31/2015 14:56:15.082] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] received handled message Start
[ERROR] [05/31/2015 14:56:15.691] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.ruforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:25)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.learner.ahka.ruforever.Runner.aroundReceive(Runner.scala:12)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

[DEBUG] [05/31/2015 14:56:15.699] [race-my-pinned-dispatcher-5] [akka://race/user/coach/runner] stopped
Reply all
Reply to author
Forward
0 new messages