Stackable actor traits

480 views
Skip to first unread message

Evan Chan

unread,
Dec 19, 2012, 3:08:53 AM12/19/12
to akka...@googlegroups.com
Dear Hakkers,

I don't know if this has been covered before, but I thought this was too cool of a pattern not to share.   Let's say I want to mix in behavior to all my actors, such as logging, metrics collection, etc.  I can start with a base trait that enables "stackable" actor traits:

trait ActorStack { this: Actor =>
  /** Actor classes should implement this partialFunction for standard actor message handling */
  def wrappedReceive: Receive

  /** Stackable traits should override and call super.receiveWrapper() for stacking functionality */
  @inline
  def receiveWrapper(x: Any) = { wrappedReceive(x) }

  def receive: Receive = {
    case x: Any => receiveWrapper(x)
  }
}

Now, let's say that I want to add a trait for direct logging to SLF4J (which has many benefits, but I don't want to get into that here).    Presto:

trait Slf4jLogging extends Actor with ActorStack {
  val logger = LoggerFactory.getLogger(getClass)
  private[this] val myPath = self.path.toString

  logger.info("Starting actor " + getClass.getName)

  override def receiveWrapper(x: Any) {
    // Because each actor receive invocation could happen in a different thread, and MDC is thread-based,
    // we kind of have to set the MDC anew for each receive invocation.  :(
    org.slf4j.MDC.put("akkaSource", myPath)
    super.receiveWrapper(x)
  }
}

The reason why I decided to use a receiveWrapper()  (instead of stacking Receive's with andThen / orElse ) is to enable code that could potentially analyze the entire receive call, such as for metrics collection:

trait ActorMetrics extends Actor with ActorStack {
  val metricReceiveTimer = Metrics.newTimer(getClass, "message-handler",
                                           TimeUnit.MILLISECONDS, TimeUnit.SECONDS)

  override def receiveWrapper(x: Any) {
    val context = metricReceiveTimer.time()
    try {
      super.receiveWrapper(x)
    } finally {
      context.stop()
    }
  }
}


Now, you can compose all these traits together easily:

abstract class MyInstrumentedActor extends Actor with Slf4jLogging with ActorMetrics

Pretty neat, huh?

-Evan

pagoda_5b

unread,
Dec 19, 2012, 7:00:40 AM12/19/12
to akka...@googlegroups.com
+1 for a LetItCrash blog entry


On Wednesday, December 19, 2012 9:08:53 AM UTC+1, Evan Chan wrote:
Dear Hakkers,

I don't know if this has been covered before, but I thought this was too cool of a pattern not to share.   Let's say I want to mix in behavior to all my actors, such as logging, metrics collection, etc.  I can start with a base trait that enables "stackable" actor traits:


-Evan

Patrik Nordwall

unread,
Dec 19, 2012, 7:53:11 AM12/19/12
to akka...@googlegroups.com
Thank you Evan for sharing. Looks very useful.

How does it work together with become?
Does it work with unhandled messages?

Cheers,
Patrik


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
 
 



--

Patrik Nordwall
Typesafe The software stack for applications that scale
Twitter: @patriknw


Evan Chan

unread,
Dec 19, 2012, 1:06:13 PM12/19/12
to akka...@googlegroups.com
Patrik,

You've caught a use case with unhandled(), thanks.   The best way I can think of is to modify receiveWrapper to do

wrappedReceive(x) orElse unhandled(x)

As for become, I'm not familiar enough with become() to comment, as we don't currently use it, but suggestions are welcome  :)

-Evan
--
Evan Chan
Senior Software Engineer | 
e...@ooyala.com | (650) 996-4600
www.ooyala.com | blog | @ooyala

Patrik Nordwall

unread,
Dec 19, 2012, 1:50:31 PM12/19/12
to akka...@googlegroups.com
On Wed, Dec 19, 2012 at 7:06 PM, Evan Chan <e...@ooyala.com> wrote:
Patrik,

You've caught a use case with unhandled(), thanks.   The best way I can think of is to modify receiveWrapper to do

wrappedReceive(x) orElse unhandled(x)

good
 

As for become, I'm not familiar enough with become() to comment, as we don't currently use it, but suggestions are welcome  :)

become/unbecome replaces the behaviour, i.e. the receive partial function. When used together with these stackable traits you would like it to replace the wrappedReceive. I have not thought much about how that would be possible, but I know we had a lengthy discussion about the difficulties around that in a previous (generic) attempt to support something similar.

Cheers,
Patrik

Raymond Roestenburg

unread,
Dec 19, 2012, 1:53:52 PM12/19/12
to akka...@googlegroups.com
Hi Evan, 

I've blogged about something similar (but not exactly the same) in part here, maybe you will enjoy it:

http://letitcrash.com/post/30585282971/discovering-message-flows-in-actor-systems-with-the



-Evan

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
 
 



--
Raymond Roestenburg

Martin Krasser

unread,
Dec 19, 2012, 1:57:19 PM12/19/12
to akka...@googlegroups.com

Am 19.12.12 19:06, schrieb Evan Chan:
Patrik,

You've caught a use case with unhandled(), thanks.   The best way I can think of is to modify receiveWrapper to do

wrappedReceive(x) orElse unhandled(x)

As for become, I'm not familiar enough with become() to comment, as we don't currently use it, but suggestions are welcome  :)

I approached this as described here. The trait related to the described functionality is Behavior.scala (which also deals with unhandled messages). It was possible to inherit the behavior stack from the Actor trait in Akka 2.0 but not any more with Akka 2.1. Hence, Behavior.scala is a bit redundant to what is implemented in ActorCell. Any suggestions how to avoid is redundancy is welcome.

Evan Chan

unread,
Dec 19, 2012, 1:58:18 PM12/19/12
to akka...@googlegroups.com
Raymond,

Thanks.  I should have given credit to you for inspiring part of this, my apologies.   :)

I also read your recent blog post on exponential backoff for retries.  Really good stuff.

By the way, for the Spider pattern .... what are your thoughts on crawling to find the message flows, vs. publishing traces to a central collector, ie Google Dapper or Twitter Zipkin?

-Evan

Raymond Roestenburg

unread,
Dec 19, 2012, 2:33:20 PM12/19/12
to akka...@googlegroups.com
On Wed, Dec 19, 2012 at 7:58 PM, Evan Chan <e...@ooyala.com> wrote:
Raymond,

Thanks.  I should have given credit to you for inspiring part of this, my apologies.   :)
 
no worries, cool to hear :-) 

I also read your recent blog post on exponential backoff for retries.  Really good stuff.

Thanks!
 
By the way, for the Spider pattern .... what are your thoughts on crawling to find the message flows, vs. publishing traces to a central collector, ie Google Dapper or Twitter Zipkin?

Depends on the use case of course, If you know in advance exactly what you want to trace/log and you just want to log then publishing is the (easiest and best) way to go.
If you want to discover stuff in a more dynamic setup and query your system then crawling is a nice option, the stackable trait in there allows you to add some behavior as a cross cutting concern in your app. I haven't pushed this to its limits though.
And definitely look at Martin's eventsourced lib it's really nice and uses stackable traits a lot :-)   

Hamilton Turner

unread,
Sep 26, 2014, 8:47:47 PM9/26/14
to akka...@googlegroups.com
All, 

Apologies for resurrecting a dead thread with a big wall of text, but I found these stackable traits really useful as a drop-in feature so I upgraded them a bit. 
This adds support become+unbecome and postReceive hooks. 
Feel free to point out any problems you spot!

/**
 * Allows an approach similar to aspects with Actors - invisibly rope in new
 * behavior to all your actors by mixing in these traits and then using
 * <code>wrappedReceive</code> instead of <code>receive</code>. Also supports
 * post receive calls for enabling behavior there
 *
 * <code>
 * class MyActor extends Actor with Slf4jLogging {
 *   def wrappedReceive = {
 *     case x => {}
 *   }
 *   override def postReceive = {
 *     case x => {}
 *   }
 * }
 * </code>
 * See https://groups.google.com/d/topic/akka-user/J4QTzSj5usQ/discussion
 */

trait
ActorStack { this: Actor =>


 
/** Actor classes should implement this partialFunction for standard actor message handling */
 
def wrappedReceive: Receive



 
/** (Optional) Actor classes can override this */
 
def postReceive: Receive = {
   
case x => {}

 
}


 
/** Stackable traits should override and call super.receiveWrapper() for stacking functionality */
 
@inline

 
def receiveWrapper(x: Any, receive: Receive) = receive(x)


 
/** Stackable traits should override this and call super for stacking this */
 
@inline
 
def postReceiveWrapper(x: Any, postreceive: Receive) = postreceive(x)


 
/** For logging MatchError exceptions */
 
private[this] val stackLog = LoggerFactory.getLogger(getClass)

 
private[this] val myPath = self.path.
toString


 
def wrapReceive(receive: Receive = wrappedReceive, postreceive: Receive = postReceive): Receive = {
   
case x: Any => try {
      val result
= receiveWrapper(x, receive)
      postReceiveWrapper
(x, postreceive)
      result
   
} catch {
     
case nomatch: MatchError => {
       
// Because we claim we can handle x: Any, you'll occasionally see these
       
// MatchError. I log them because I can, you could drop this code and just
       
// invisible ignore them or you could remove the try{}catch{} and have them
       
// printed in the standard exception handling manner
        org.slf4j.MDC.put("akkaSource", myPath)
        stackLog
.info(s"Received unhandled message $x")
        postReceiveWrapper
(x, postreceive)
     
}
   
}
 
}
 
 
/** Setup default behavior */
 
def receive: Receive = wrapReceive()
}



Here is a simple actor to show off usage. 

class TestActor extends Actor
 
with ActorLogging
 
with ActorStack
 
with Slf4jLoggingStack {
 
var count = -3

 
/** Drop default behavior into wrappedReceive */
 
def wrappedReceive = original

  val original
: Receive = {
   
case a: Int => {
      log
.debug(s"$count: original behavior")
      count
= count + a
     
if (count >= 0) {
        log
.debug("I am becoming")
       
/**
         *  This is the meat. You minimally need wrapReceive(updated),
         *  adding a different postReceive handler is optional
         */

        context
.become(wrapReceive(updated,postReceiveUpdated))
     
}
   
}
 
}

 
def updated: Receive = {
   
case a: Int => {
      log
.info(s"$count: Updated behavior")
      count
= count + a
     
if (count < 0) {
        log
.info("I am unbecoming")
        context
.unbecome
     
}
   
}
 
}


 
/**
   * This is optional (note the override). postReceive handlers are
   * passed the original message so you can match if needed
   */

 
override def postReceive: Receive = {
   
case x => {
      log
.info(s"$count: Post Message Processing")
   
}
 
}
 
 
/** Let's use a custom handler for our updated state */
 
def postReceiveUpdated: Receive = {
   
case y => {
      log
.info(s"$count: Updated Post Message Processing")
   
}
 
}  
}



Ok, let's run that actor to see what happens: 

  val test = system.actorOf(Props(new TestActor()), name = "test")
 
for (i <- 1 to 5) {
    test
! 1
   
Thread.sleep(1000) // Small delay to avoid logging threads getting mixed    
 
}
 
for (i <- 1 to 5) {
    test
! -1
   
Thread.sleep(1000) // Small delay to avoid logging threads getting mixed    
 
}
  test
! "chickens"

Here's some (cleaned up) log output. Note that I'm using both SLF4J here and Akka's ActorLogging, so you can easily tell whihc messages are coming from the traits and which messages are coming from the TestActor 

INFO  clasp.TestActor - Starting actor clasp.TestActor
INFO  clasp
.TestActor - Receiving 1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -3: original behavior
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] -2: Post Message Processing for 1
INFO  clasp
.TestActor - Receiving 1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -2: original behavior
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] -1: Post Message Processing for 1
INFO  clasp
.TestActor - Receiving 1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -1: original behavior
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] I am becoming
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 0: Post Message Processing for 1
INFO  clasp
.TestActor - Receiving 1
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 0: Updated behavior
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 1: Updated Post Message Processing for 1
INFO  clasp
.TestActor - Receiving 1
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 1: Updated behavior
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 2: Updated Post Message Processing for 1
INFO  clasp
.TestActor - Receiving -1
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 2: Updated behavior
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 1: Updated Post Message Processing for -1
INFO  clasp
.TestActor - Receiving -1
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 1: Updated behavior
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 0: Updated Post Message Processing for -1
INFO  clasp
.TestActor - Receiving -1
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] 0: Updated behavior
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] I am unbecoming
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] -1: Updated Post Message Processing for -1
INFO  clasp
.TestActor - Receiving -1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -1: original behavior
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] -2: Post Message Processing for -1
INFO  clasp
.TestActor - Receiving -1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -2: original behavior
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] -3: Post Message Processing for -1
INFO  clasp
.TestActor - Receiving chickens
INFO  clasp
.TestActor - Received unhandled message chickens
[INFO]  [akka.tcp://cl...@10.0.0.20:2552/user/test] -3: Post Message Processing for chickens


Hopefully this helps someone else (and are no major mistakes :-P)

Best, 
Hamilton
Reply all
Reply to author
Forward
0 new messages