--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
val connections = Tcp().bind(interface, port)
connections runForeach { conn ⇒ log.info(s"Client connected from: ${conn.remoteAddress}")
try { val flow = MTProto.flow(nextConnId(), maxBufferSize, sessionRegion) conn.handleWith(flow) } catch { case e: Exception ⇒ log.error(e, "Failed to create connection flow") }}def flow(connId: String, sessionRegion: SessionRegion)(implicit db: Database, system: ActorSystem, timeout: Timeout) = {
val authManager = system.actorOf(AuthorizationManager.props(db), s"authManager-${connId}")
val authSource = Source(ActorPublisher[MTProto](authManager))
val sessionClient = system.actorOf(SessionClient.props(sessionRegion), s"sessionClient-${connId}")
val sessionClientSource = Source(ActorPublisher[MTProto](sessionClient))
val mtprotoFlow = Flow[ByteString]
.transform(() ⇒ new PackageParseStage)
.transform(() ⇒ new PackageCheckStage)
.transform(() ⇒ new PackageHandleStage(protoVersions, apiMajorVersions, authManager, sessionClient))
val mapRespFlow: Flow[MTProto, ByteString, Unit] = Flow[MTProto]
.transform(() ⇒ mapResponse(system))
val completeSink = Sink.onComplete {
case x ⇒
system.log.debug("Completing {}", x)
}
Flow() { implicit builder ⇒
import FlowGraph.Implicits._
val bcast = builder.add(Broadcast[ByteString](2))
val merge = builder.add(Merge[MTProto](3))
val mtproto = builder.add(mtprotoFlow)
val auth = builder.add(authSource)
val session = builder.add(sessionClientSource)
val mapResp = builder.add(mapRespFlow)
val complete = builder.add(completeSink)
// format: OFF
mtproto ~> merge
auth ~> merge
session ~> merge ~> mapResp ~> bcast ~> complete
// format: ON
(mtproto.inlet, bcast.out(1))
}
}
def mapResponse(system: ActorSystem) = new PushStage[MTProto, ByteString] {
private[this] var packageIndex: Int = -1
override def onPush(elem: MTProto, ctx: Context[ByteString]) = {
packageIndex += 1
val pkg = TransportPackage(packageIndex, elem)
val resBits = TransportPackageCodec.encode(pkg).require
val res = ByteString(resBits.toByteBuffer)
elem match {
case _: Drop ⇒
ctx.pushAndFinish(res)
case _ ⇒
ctx.push(res)
}
}
--
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/MC1J4q9lZYw/unsubscribe.To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
auth and session makes context.stop(self) inside Cancel handler of their receive's.
And they see it if client closes connection (at least if it is being closed by Connection reset by peer).
On Sun, Jun 7, 2015 at 5:32 PM, Andrey Kuznetsov <fe...@loathing.in> wrote:auth and session makes context.stop(self) inside Cancel handler of their receive's.umm, but they will not get a Cancel from merge if merge itself is not cancelled, which it will not be in this case. So mtproto gets the completion from the TCP flow, feeds it into merge, but merge will continue happily.
val completeSink = Sink.onComplete {
case x ⇒
authManager ! PoisonPill
sessionClient ! PoisonPill
system.log.debug("Completing {}", x)
}
Flow() { implicit builder ⇒
import FlowGraph.Implicits._
val bcast = builder.add(Broadcast[ByteString](2))
val merge = builder.add(Merge[MTProto](3))
val mtproto = builder.add(mtprotoFlow)
val auth = builder.add(authSource)
val session = builder.add(sessionClientSource)
val mapResp = builder.add(mapRespFlow)
val complete = builder.add(completeSink)
// format: OFF
bcast ~> complete
bcast ~> mtproto ~> merge
auth ~> merge
session ~> merge ~> mapResp