GraphDSL.create(Source.actorRef[ChatMessage](bufferSize = 1000, OverflowStrategy.fail)) { implicit builder => chatSource => import GraphDSL.Implicits._ implicit val executionContext = actorSystem.dispatcher var stopper = builder.add(KillSwitches.single[ChatEvent]) val fromWebsocket = builder.add( Flow[Message] .watchTermination()( (_, te) => te.onSuccess { case cause => //********** here? how to stop the flow? }) .throttle(1, 1 seconds, 1, ThrottleMode.shaping) //the throttle is what i want but it causes a delay EVEN after the stream gets closed //so i want to watch the flows Termination and close it myself! //no idea what I'm doing is correct or not but what i want is this: //when stream gets closed just dont wait for anything and close the sink. .collect { case TextMessage.Strict(txt) => IncomingMessage(user_id, txt) } ) val actorAsSource = builder.materializedValue.map(actor => UserJoined(user_id, rooms, actor)) val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user_id, rooms)) val mergeToChat = builder.add(Merge[ChatEvent](2)) actorAsSource ~> mergeToChat.in(0) fromWebsocket ~> mergeToChat.in(1) mergeToChat ~> stopper ~> chatActorSink // look at the stopper here! ***
//...
You have a throttle for 1mgs/sec and a buffer of 1000mgs and a overflow strategy to drop messages. The messages will keep coming if you don't fill up the buffer, up to 1000 seconds after upstream has stopped sending messages. Is this what you are talking about?
.throttle(1, 1 seconds, 1, ThrottleMode.shaping) val actorAsSource = builder.materializedValue.map(actor => UserJoined(user_id, rooms, actor))
val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user_id, rooms)) val mergeToChat = builder.add(Merge[ChatEvent](2)) actorAsSource ~> mergeToChat.in(0) fromWebsocket ~> mergeToChat.in(1)
mergeToChat ~> stopper ~> chatActorSink // look at the stopper here! ***[INFO] [05/04/2016 19:32:10.981] [akka-system-akka.actor.default-dispatcher-8] [akka://akka-system/user/StreamSupervisor-0/flow-14-0-actorRefSource] Message [io.scalac.akka.http.websockets.chat.ChatMessage] from Actor[akka://akka-system/user/chat-53aff88cdc302896c7000022#71135517] to Actor[akka://akka-system/user/StreamSupervisor-0/flow-14-0-actorRefSource#-676858346] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/OisR-nYVJXA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.