Akka actors and blocking I/O -

621 views
Skip to first unread message

foolshat

unread,
Aug 30, 2011, 7:16:43 AM8/30/11
to Akka User List
Hi,

I'm using some 3rd party libraries which must be called in a loop to
receive input over time, this code blocks waiting for I/O to finish.
I can run this within an Akka actor, sending a message to a 2nd actor
to process the input received. However, as it is running in an
infinite loop within the receive method, this does not feel right,
further I can't kill the actor nicely as it can't get a message as it
is in an infinite loop - I need to shutdown the actor cleanly to close
some files correctly etc..

What is a better actor/Akka way of dealing with this ? Previously I
would have used a thread, but I want to benefit from the Akka
supervision hierarchies.
Could Camel help here ?

The input is coming from a serial port infrequently, and from a camera
frequently.

Thanks guys.

√iktor Ҡlang

unread,
Aug 30, 2011, 7:50:53 AM8/30/11
to akka...@googlegroups.com

Make it a messace passing loop:

def receive = {
  case Process(input) => //dowhateverwithinput
    self ! Next
  case Next => val input = readInput()
     self ! Process(input)
}

Then in preStart/postRestart send self a Next message to get the loop started.

Helps?

Cheers
V

> --
> You received this message because you are subscribed to the Google Groups "Akka User List" group.
> To post to this group, send email to akka...@googlegroups.com.
> To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.
>

foolshat

unread,
Aug 31, 2011, 5:36:05 AM8/31/11
to Akka User List
Thanks Victor, that works, a nice simple pattern.

To help others, here is a little sample actor class based on your
snippet.


import org.slf4j.{ LoggerFactory, Logger }
import akka.actor.{ Actor }


case class Process
case class Next
case class Stop

class TestIOActor extends Actor {

private val log: Logger = LoggerFactory.getLogger(this.getClass())

override def preStart() {
log.debug("preStart")
self ! Next
}

override def preRestart(reason: scala.Throwable) {
log.debug("preRestart")
self ! Next
}

def receive = {

case Process => {
log.debug("Process")
self ! Next
}

case Next => {
log.debug("Next")
self ! Process
}

case Stop => {
log.debug("Stop")
self.stop()
}

}
}
Reply all
Reply to author
Forward
0 new messages