private val system = ActorSystem(
Actor.supervise(guardian(Map.empty)).onFailure[Throwable](SupervisorStrategy.resume),
"EvaluateOnceFlushable"
)
val c = ctx.spawn(
Actor.supervise(child(None)).onFailure[Throwable](SupervisorStrategy.resume),
withK.key.toString
)
c ! msg
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
import akka.typed.{ ActorRef, ActorSystem, SupervisorStrategy } import akka.typed.scaladsl.Actor import org.scalatest.FunSuite import org.scalatest.Matchers._ import akka.typed.scaladsl.AskPattern._ import akka.util.Timeout import org.scalatest.concurrent.{ IntegrationPatience, ScalaFutures } import scala.concurrent.Future import scala.concurrent.duration._ /** * @author kostas.kougios * Date: 01/09/17 */ class ActorTest extends FunSuite with ScalaFutures with IntegrationPatience { private implicit val timeout = Timeout(10.seconds) test("1st msg lost?") { for (i <- 1 to 1000) { println(i) val s = system implicit val scheduler = s.scheduler val r1 = (s ? ((ref: ActorRef[String]) => Get(5, ref))).futureValue s ! Flush(5) val r2=(s ? ((ref: ActorRef[String]) => Get(5, ref))).futureValue s.terminate() r1 should be("[5]") r2 should be("[5]") } } sealed trait Message trait Key { def key: Int } case class Get(key: Int, replyTo: ActorRef[String]) extends Message with Key case class Flush(key: Int) extends Message with Key case object FlushAll extends Message def system = ActorSystem( Actor.supervise(guardian(Map.empty)).onFailure[Throwable](SupervisorStrategy.resume), "EvaluateOnceFlushable" ) private def guardian(m: Map[Int, ActorRef[Message]]): Actor.Immutable[Message] = Actor.immutable[Message] { (ctx, msg) => msg match { case withK: Key => m.get(withK.key) match { case Some(c) => println("child exists") c ! msg Actor.same case None => println("spawning child")
val c = ctx.spawn( Actor.supervise(child(None)).onFailure[Throwable](SupervisorStrategy.resume),
withK.key.toString
)
c ! msg
guardian(m + (withK.key -> c))
}
case FlushAll =>
for ((k, a) <- m) a ! Flush(k)
Actor.same
}
}
private def child(state: Option[Future[String]]): Actor.Immutable[Message] = Actor.immutable[Message] {
(ctx, msg) =>
implicit val ec = ctx.executionContext
msg match {
case Get(key, replyTo) =>
state match {
case Some(f) =>
f.onSuccess {
case r =>
replyTo ! r
}
Actor.same
case None =>
val f = Future {
// do some heavy calculation
s"[$key]"
}
f.onSuccess {
case r =>
replyTo ! r
}
child(Some(f))
}
case Flush(_) | FlushAll =>
child(None)
}
}
}
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/UiZjDP43Cqw/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.