Passivate

466 views
Skip to first unread message

Ashley Aitken

unread,
Jul 14, 2014, 11:28:42 AM7/14/14
to akka...@googlegroups.com

A couple of quick questions about passivate and PersistentActors:

Can other actors still send messages to persistent actors that have been passivated?

Will these messages cause the persistent actor to be reactivated?

I am asking about this in single node and clustered context.

I saw elsewhere that Patrik has written this in the cluster/sharding context:

- all messages are sent via the Manager actor, which creates child Aggregate instances on demand
- when receiving a message the Manager extract the Aggregate identifier from the message
- the Manager creates a new child Aggregate actor if it doesn't exist, and then forwards the message to the Aggregate
- the Aggregate can passivate itself by sending a Passivate message to the parent Manager, which then sends PoisonPill to the Aggregate
- in-between receiving Passivate and Terminated the Manager will buffer all incoming messages for the passivating Aggregate
- when receiving Terminated it will flush the buffer for the Aggregate, which can result in activation again
The PoisonPill can be replaced with some other custom stop message if the Aggregate needs to do further interactions with other actors before stopping.

Thanks in advance for any answers.

Cheers,
Ashley.


delasoul

unread,
Jul 15, 2014, 3:19:42 AM7/15/14
to akka...@googlegroups.com
Hello Ashey,

I guess you answered your question(s) yourself:


- in-between receiving Passivate and Terminated the Manager will buffer all incoming messages for the passivating Aggregate
- when receiving Terminated it will flush the buffer for the Aggregate, which can result in activation again.

So, the answer to both questions is yes, but the buffer for passivated actors is limited, if the limit is reached all messages will go to DeadLetters.

hth,

michael

Konrad Malawski

unread,
Jul 15, 2014, 12:03:57 PM7/15/14
to Akka User List
Just like Michael (quoting Patrik) said :-)

Also, please note that Passivate is not a feature of akka-persistence, it's a feature of cluster sharding:

It's true however that it (and the entire cluster sharding) plays very well with persistence :-)


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,
Konrad 'ktoso' Malawski
hAkker @ Typesafe


Ashley Aitken

unread,
Jul 15, 2014, 11:00:44 PM7/15/14
to akka...@googlegroups.com

Thank you Michael and Konrad for your posts.

I was somewhat confused as to whether the manager was something special or it referred to a processors' usual supervisor / parent.

I also thought passivate was a part of akka-persistence but Konrad's post makes it clear it is a part of cluster sharding (no wonder I couldn't find anything about it in the docs for akka-persistence) and the Manager in Patrik's post is a part of the cluster sharding infrastructure.

Cheers,
Ashley.





Ashley Aitken

unread,
Jul 27, 2014, 11:07:37 AM7/27/14
to akka...@googlegroups.com

Hello again Michael et al,

On Tuesday, 15 July 2014 15:19:42 UTC+8, delasoul wrote:
I guess you answered your question(s) yourself:

- in-between receiving Passivate and Terminated the Manager will buffer all incoming messages for the passivating Aggregate
- when receiving Terminated it will flush the buffer for the Aggregate, which can result in activation again.

So, the answer to both questions is yes,

I am not sure the answer to both question is yes?  The second question asked if the actor would be activated again when messages were sent to it and it seems to me that it isn't, as you highlight the messages are buffered by the Manager (ShardRegion, I believe). 

I am not exactly sure what is meant by when receiving Terminated (is that the PoisonPill or similar message)?  Then the buffer the for Aggregate is flushed - does that mean the messages are sent to the Aggregate?  Which can result in activation again - why only can? 

I was thinking more of activation on arrival of each message (if the Aggregate is not already active, of course).  I get the feeling that Akka Persistence or at least Cluster Sharding needs some sort of push-based / event/message driven activation of Actors.  Or can the Cluster Sharing already do this?

Cheers,
Ashley.


ahjohannessen

unread,
Jul 28, 2014, 10:38:44 AM7/28/14
to akka...@googlegroups.com
Ashley, perhaps this might be useful for you.

import scala.concurrent.duration._
import akka.actor._
import benefits.configuration.Settings

/* Message used by `Child` to signal `Parent` it should
* be passivated by way of PoisonPill after a configurable
* idle time.
* */
case object Passivate

trait Parent extends Actor with Stash with ActorLogging {
  import Parent._
  import context._

  private val settings =
    Settings(system).Support.Passivation

  def withPassivation(receive: Receive): Receive =
    receive orElse passivation

  private[support] def passivation: Receive = {
    case Passivate passivate(sender())
    case Terminated(ref) logUnexpected(ref)
    case WaitTick(ref) // no-op
  }

  private def passivate(entry: ActorRef) = {
    log.debug(s"Passivation start: $entry")
    watch(entry)
    entry ! PoisonPill
    val tick = scheduleTick(entry)
    val await = awaiting(entry, tick)
    become(await, discardOld = false)
  }

  private def awaiting(ref: ActorRef, tick: Cancellable): Receive = {
    case Terminated(`ref`) terminated(ref, tick)
    case WaitTick(`ref`) waitTick(ref)
    case _ stash()
  }

  private def terminated(ref: ActorRef, tick: Cancellable) = {
    tick.cancel()
    previousBehavior(ref)
    log.debug(s"Passivation end: $ref")
  }

  private def waitTick(ref: ActorRef) = {
    previousBehavior(ref)
    log.warning(s"Giving up waiting for Terminated($ref)")
  }

  private def previousBehavior(ref: ActorRef) = {
    unwatch(ref)
    unbecome()
    unstashAll()
  }

  private def logUnexpected(ref: ActorRef) =
    log.warning(s"Unexpected terminated for $ref")

  private def scheduleTick(entry: ActorRef) = {
    system.scheduler.scheduleOnce(
      waitTime, self, WaitTick(entry)
    )
  }

  def waitTime: FiniteDuration =
    settings.parentWaitTime
}

object Parent {
  private[support] case class WaitTick(
    entry: ActorRef
  )
}

trait Child extends Actor with ActorLogging {
  import Child._
  import context._

  private val settings =
    Settings(system).Support.Passivation

  override def preStart() = {
    super.preStart()
    self ! StartTimingOut
  }

  def passivator: ActorRef = parent

  def withPassivation(receive: Receive): Receive =
    receive orElse passivation

  private[support] def passivation: Receive = {
    case ReceiveTimeout issuePassivate()
    case StartTimingOut startTimeout()
  }

  private def startTimeout() = {
    log.debug(s"Setting receive timeout to $idleTime")
    setReceiveTimeout(idleTime)
  }

  private def issuePassivate() = {
    log.debug(s"Sending Passivate to $passivator")
    setReceiveTimeout(Duration.Inf)
    passivator ! Passivate
  }

  def idleTime: FiniteDuration =
    settings.childIdleTime
}

object Child {
  private[support] case object StartTimingOut
}
Reply all
Reply to author
Forward
0 new messages