Here is an excerpted/simplified version of what we're doing (hopefully I didn't mess things up too badly during sanitization):
class ConnectionListener(endpoint: InetSocketAddress) extends Actor with ActorLogging {
import akka.io.{ IO, Tcp }
implicit val system = context.system
IO(Tcp) ! Tcp.Bind(self, endpoint)
override def receive = {
case Tcp.Connected(remote, _) ⇒ handleConnected(remote, sender())
case Terminated(handler) ⇒ log info s"Handler terminated: $handler"
}
// The pipeline is built using Google Protobuf
private def pipelineStages =
new ProtoBufStage >>
new LengthFieldFrame(maxSize = maxFrameSize) >>
new TcpReadWriteAdapter
private def handleConnected(remote: InetSocketAddress, connection: ActorRef): Unit = {
log info s"Client connected from IP $remote"
val init = TcpPipelineHandler withLogger (log, pipelineStages)
val handler = context.actorOf(Props(classOf[ConnectionHandler], init, connection)),
s"gateway-connection-handler-${UUID.randomUUID().toString}"
)
val pipeline = context.actorOf(TcpPipelineHandler.props(init, connection, handler).withDeploy(Deploy.local))
connection ! Tcp.Register(pipeline)
context watch handler
}
}
class ConnectionHandler(
init: Init[WithinActorContext, ServerMessage, ClientMessage],
connection: ActorRef) extends Actor with ActorLogging {
var pipeline: Option[ActorRef] = None
def receive = LoggingReceive {
case init.Event(c: ClientMessage) ⇒
pipeline = Some(sender)
processMessage(c)
case s: ServerMessage ⇒ pipeline foreach { _ ! init.Command(s) }
case _: Tcp.ConnectionClosed ⇒ handleClosedConnection
case Terminated(`connection`) ⇒ handleClosedConnection
}
private def processMessage(c: ClientMessage): Unit = {
log info s"Processing message $c"
}
private def handleClosedConnection: Unit = {
log info s"Client has disconnected"
}