akka.typed, is it possible a message is lost when it is send just after the creation of an actor?

35 views
Skip to first unread message

Kostas kougios

unread,
Aug 31, 2017, 8:18:38 AM8/31/17
to Akka User List
I am using the ask pattern to 

system ? (ref => Get(key, ref))

where

private val system = ActorSystem(
Actor.supervise(guardian(Map.empty)).onFailure[Throwable](SupervisorStrategy.resume),
"EvaluateOnceFlushable"
)

Within the guardian actor, I create a children actor and forward the msg:

val c = ctx.spawn(
Actor.supervise(child(None)).onFailure[Throwable](SupervisorStrategy.resume),
withK.key.toString
)
c ! msg


Occasionally my ask times out because the child actor doesn't run (I have logging for when it receives the msg but it occasionally not logs anything and the ask call times out).
Is it possible the c ! msg is never received by the child?


Patrik Nordwall

unread,
Aug 31, 2017, 1:56:48 PM8/31/17
to akka...@googlegroups.com
That would be a bug. Please create an issue. I would guess that the problem is in system ? and not in the child. Did you add logging in the guardian also?

/Patrik
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Konstantinos Kougios

unread,
Sep 19, 2017, 8:09:58 AM9/19/17
to akka...@googlegroups.com
I did create the following test but it passes. So far I am not sure if it is an akka bug or something else.


import akka.typed.{ ActorRef, ActorSystem, SupervisorStrategy }
import akka.typed.scaladsl.Actor
import org.scalatest.FunSuite
import org.scalatest.Matchers._
import akka.typed.scaladsl.AskPattern._
import akka.util.Timeout
import org.scalatest.concurrent.{ IntegrationPatience, ScalaFutures }

import scala.concurrent.Future
import scala.concurrent.duration._

/**
  * @author kostas.kougios
  *         Date: 01/09/17
  */
class ActorTest extends FunSuite with ScalaFutures with IntegrationPatience
{
  private implicit val timeout = Timeout(10.seconds)

  test("1st msg lost?") {
    for (i <- 1 to 1000) {
      println(i)
      val s = system
      implicit val scheduler = s.scheduler
      val r1 = (s ? ((ref: ActorRef[String]) => Get(5, ref))).futureValue
      s ! Flush(5)
      val r2=(s ? ((ref: ActorRef[String]) => Get(5, ref))).futureValue
      s.terminate()
      r1 should be("[5]")
      r2 should be("[5]")
    }
  }

  sealed trait Message
  trait Key {
    def key: Int
  }

  case class Get(key: Int, replyTo: ActorRef[String]) extends Message with Key
  case class Flush(key: Int) extends Message with Key
  case object FlushAll extends Message

  def system = ActorSystem(
    Actor.supervise(guardian(Map.empty)).onFailure[Throwable](SupervisorStrategy.resume),
    "EvaluateOnceFlushable"
  )

  private def guardian(m: Map[Int, ActorRef[Message]]): Actor.Immutable[Message] = Actor.immutable[Message] {
    (ctx, msg) =>
      msg match {
        case withK: Key =>
          m.get(withK.key) match {
            case Some(c) =>
              println("child exists")
              c ! msg
              Actor.same
            case None =>
              println("spawning child")
              
val c = ctx.spawn(
                Actor.supervise(child(None)).onFailure[Throwable](SupervisorStrategy.resume),
                
withK.key.toString
              )
              c ! msg
              guardian(m + (withK.key -> c))
          }
        case FlushAll =>
          for ((k, a) <- m) a ! Flush(k)
          Actor.same
      }
  }

  private def child(state: Option[Future[String]]): Actor.Immutable[Message] = Actor.immutable[Message] {
    (ctx, msg) =>
      implicit val ec = ctx.executionContext
      msg match {
        case Get(key, replyTo) =>
          state match {
            case Some(f) =>
              f.onSuccess {
                case r =>
                  replyTo ! r
              }
              Actor.same
            case None =>
              val f = Future {
                // do some heavy calculation
                s"[$key]"
              }
              f.onSuccess {
                case r =>
                  replyTo ! r
              }
              child(Some(f))
          }
        case Flush(_) | FlushAll =>
          child(None)
      }
  }

}

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/UiZjDP43Cqw/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages