Dear Hakkers,I don't know if this has been covered before, but I thought this was too cool of a pattern not to share. Let's say I want to mix in behavior to all my actors, such as logging, metrics collection, etc. I can start with a base trait that enables "stackable" actor traits:
-Evan
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
Patrik Nordwall
Typesafe - The software stack for applications that scale
Twitter: @patriknw
Patrik,You've caught a use case with unhandled(), thanks. The best way I can think of is to modify receiveWrapper to dowrappedReceive(x) orElse unhandled(x)
As for become, I'm not familiar enough with become() to comment, as we don't currently use it, but suggestions are welcome :)
-Evan--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
Patrik,
You've caught a use case with unhandled(), thanks. The best way I can think of is to modify receiveWrapper to do
wrappedReceive(x) orElse unhandled(x)
As for become, I'm not familiar enough with become() to comment, as we don't currently use it, but suggestions are welcome :)
-- Martin Krasser blog: http://krasserm.blogspot.com code: http://github.com/krasserm twitter: http://twitter.com/mrt1nz
Raymond,Thanks. I should have given credit to you for inspiring part of this, my apologies. :)
no worries, cool to hear :-)
I also read your recent blog post on exponential backoff for retries. Really good stuff.
By the way, for the Spider pattern .... what are your thoughts on crawling to find the message flows, vs. publishing traces to a central collector, ie Google Dapper or Twitter Zipkin?
/**
* Allows an approach similar to aspects with Actors - invisibly rope in new
* behavior to all your actors by mixing in these traits and then using
* <code>wrappedReceive</code> instead of <code>receive</code>. Also supports
* post receive calls for enabling behavior there
*
* <code>
* class MyActor extends Actor with Slf4jLogging {
* def wrappedReceive = {
* case x => {}
* }
* override def postReceive = {
* case x => {}
* }
* }
* </code>
* See https://groups.google.com/d/topic/akka-user/J4QTzSj5usQ/discussion
*/
trait ActorStack { this: Actor =>
/** Actor classes should implement this partialFunction for standard actor message handling */
def wrappedReceive: Receive
/** (Optional) Actor classes can override this */
def postReceive: Receive = {
case x => {}
}
/** Stackable traits should override and call super.receiveWrapper() for stacking functionality */
@inline
def receiveWrapper(x: Any, receive: Receive) = receive(x)
/** Stackable traits should override this and call super for stacking this */
@inline
def postReceiveWrapper(x: Any, postreceive: Receive) = postreceive(x)
/** For logging MatchError exceptions */
private[this] val stackLog = LoggerFactory.getLogger(getClass)
private[this] val myPath = self.path.toString
def wrapReceive(receive: Receive = wrappedReceive, postreceive: Receive = postReceive): Receive = {
case x: Any => try {
val result = receiveWrapper(x, receive)
postReceiveWrapper(x, postreceive)
result
} catch {
case nomatch: MatchError => {
// Because we claim we can handle x: Any, you'll occasionally see these
// MatchError. I log them because I can, you could drop this code and just
// invisible ignore them or you could remove the try{}catch{} and have them
// printed in the standard exception handling manner
org.slf4j.MDC.put("akkaSource", myPath)
stackLog.info(s"Received unhandled message $x")
postReceiveWrapper(x, postreceive)
}
}
}
/** Setup default behavior */
def receive: Receive = wrapReceive()
}
class TestActor extends Actor
with ActorLogging
with ActorStack
with Slf4jLoggingStack {
var count = -3
/** Drop default behavior into wrappedReceive */
def wrappedReceive = original
val original: Receive = {
case a: Int => {
log.debug(s"$count: original behavior")
count = count + a
if (count >= 0) {
log.debug("I am becoming")
/**
* This is the meat. You minimally need wrapReceive(updated),
* adding a different postReceive handler is optional
*/
context.become(wrapReceive(updated,postReceiveUpdated))
}
}
}
def updated: Receive = {
case a: Int => {
log.info(s"$count: Updated behavior")
count = count + a
if (count < 0) {
log.info("I am unbecoming")
context.unbecome
}
}
}
/**
* This is optional (note the override). postReceive handlers are
* passed the original message so you can match if needed
*/
override def postReceive: Receive = {
case x => {
log.info(s"$count: Post Message Processing")
}
}
/** Let's use a custom handler for our updated state */
def postReceiveUpdated: Receive = {
case y => {
log.info(s"$count: Updated Post Message Processing")
}
}
}
val test = system.actorOf(Props(new TestActor()), name = "test")
for (i <- 1 to 5) {
test ! 1
Thread.sleep(1000) // Small delay to avoid logging threads getting mixed
}
for (i <- 1 to 5) {
test ! -1
Thread.sleep(1000) // Small delay to avoid logging threads getting mixed
}
test ! "chickens"INFO clasp.TestActor - Starting actor clasp.TestActor
INFO clasp.TestActor - Receiving 1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -3: original behavior
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] -2: Post Message Processing for 1
INFO clasp.TestActor - Receiving 1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -2: original behavior
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] -1: Post Message Processing for 1
INFO clasp.TestActor - Receiving 1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -1: original behavior
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] I am becoming
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 0: Post Message Processing for 1
INFO clasp.TestActor - Receiving 1
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 0: Updated behavior
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 1: Updated Post Message Processing for 1
INFO clasp.TestActor - Receiving 1
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 1: Updated behavior
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 2: Updated Post Message Processing for 1
INFO clasp.TestActor - Receiving -1
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 2: Updated behavior
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 1: Updated Post Message Processing for -1
INFO clasp.TestActor - Receiving -1
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 1: Updated behavior
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 0: Updated Post Message Processing for -1
INFO clasp.TestActor - Receiving -1
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] 0: Updated behavior
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] I am unbecoming
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] -1: Updated Post Message Processing for -1
INFO clasp.TestActor - Receiving -1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -1: original behavior
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] -2: Post Message Processing for -1
INFO clasp.TestActor - Receiving -1
[DEBUG] [akka.tcp://cl...@10.0.0.20:2552/user/test] -2: original behavior
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] -3: Post Message Processing for -1
INFO clasp.TestActor - Receiving chickens
INFO clasp.TestActor - Received unhandled message chickens
[INFO] [akka.tcp://cl...@10.0.0.20:2552/user/test] -3: Post Message Processing for chickens