[Akka-stream|http] - How to close a flow with killswitch from outside?

1,048 views
Skip to first unread message

Hamidreza Davoodi

unread,
May 1, 2016, 8:18:14 AM5/1/16
to Akka User List
Hi there,

I've been trying to write a chatroom or actually poke around others' code so I finally (kinda) got the idea behind the streams (thanks to documentations and the tests specs) but I encountered an issue:

When a websocket streams gets closed, because of the regulator(throttling by 1msg/sec) that I put for receiving messages makes a delay for stream to get finished so a dead actor(of ws stream)  will receive the unwanted messages(since it's dead! :D) and i get to see logs!
I thought and I think this is caused by throttling so I thought maybe if i close the stream myself manually i'll get rid of my obsession.

http://scastie.org/16825

or

    GraphDSL.create(Source.actorRef[ChatMessage](bufferSize = 1000, OverflowStrategy.fail)) {
      implicit builder =>
      chatSource =>
 
      import GraphDSL.Implicits._
      implicit val executionContext = actorSystem.dispatcher
 
      var stopper = builder.add(KillSwitches.single[ChatEvent])
 
      val fromWebsocket = builder.add(
        Flow[Message]
          .watchTermination()(
          (_, te) => te.onSuccess {
            case cause => //********** here? how to stop the flow?
          })
          .throttle(1, 1 seconds, 1, ThrottleMode.shaping)
          //the throttle is what i want but it causes a delay EVEN after the stream gets closed
          //so i want to watch the flows Termination and close it myself!
          //no idea what I'm doing is correct or not but what i want is this:
          //when stream gets closed just dont wait for anything and close the sink.
          .collect {
          case TextMessage.Strict(txt) => IncomingMessage(user_id, txt)
        }
      )
 
      val actorAsSource = builder.materializedValue.map(actor => UserJoined(user_id, rooms, actor))
      val chatActorSink  = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user_id, rooms))
      val mergeToChat = builder.add(Merge[ChatEvent](2))
 
      actorAsSource ~> mergeToChat.in(0)
      fromWebsocket ~> mergeToChat.in(1)
 
      mergeToChat ~> stopper ~> chatActorSink // look at the stopper here! ***

//...

Thanks!
I appreciate if you help me.


Magnus Andersson

unread,
May 4, 2016, 3:09:28 AM5/4/16
to Akka User List
I don't understand how you are to detect this behavior.

You have a throttle for 1mgs/sec and a buffer of 1000mgs and a overflow strategy to drop messages. The messages will keep coming if you don't fill up the buffer, up to 1000 seconds after upstream has stopped sending messages. Is this what you are talking about?

hamid

unread,
May 5, 2016, 3:56:31 AM5/5/16
to Akka User List
Thank you for the reply.

First, I want to use backpressure strategy to put a limit on incoming messages which I believe that's what this line is for:

          .throttle(1, 1 seconds, 1, ThrottleMode.shaping)
I think this line causes a backpressure on client side so I'll not receive no more messages than 1 per second. (Unless you tell me I'm wrong!)
I've tried "delay(1 seconds, backpressure)" as well, which also I think is the same! (unless again you tell me i'm wrong!), and caused the same effect.

Secondly, By "1000mgs and a overflow strategy ", I think you are referring to buffer of outgoing messages, which I've nothing to worry about, unless again you give a me reason to! cause i've tried and sending more than 1000s/buffer is not supported for backpressuring.

Let me tell you what I'm experiencing here.
When a websocket starts I'll send the actor of websocket which is this loc

      val actorAsSource = builder.materializedValue.map(actor => UserJoined(user_id, rooms, actor))
to a sink.actorref to gather them, and connect the "fromWebsocket" to it as well with a merge.

     val chatActorSink  = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user_id, rooms))
     val mergeToChat = builder.add(Merge[ChatEvent](2))
      actorAsSource ~> mergeToChat.in(0)
      fromWebsocket ~> mergeToChat.in(1)
      mergeToChat ~> stopper ~> chatActorSink // look at the stopper here! ***



What happens here, is kinda surprise to me, when I kill the websocket client(a nodejs script) by (ctrl+c), the console(sbt running my scala app) prints
[INFO] [05/04/2016 19:32:10.981] [akka-system-akka.actor.default-dispatcher-8] [akka://akka-system/user/StreamSupervisor-0/flow-14-0-actorRefSource] Message [io.scalac.akka.http.websockets.chat.ChatMessage] from Actor[akka://akka-system/user/chat-53aff88cdc302896c7000022#71135517] to Actor[akka://akka-system/user/StreamSupervisor-0/flow-14-0-actorRefSource#-676858346] was not delivered. [6] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'
What I think is happening here is this:
Since I'm using the "throttling" to limit the incoming messages, I think it causes a delay also when it comes to closing stream, it wont happen unless throttling finishes so it will bring a unnecessary delay but the actor is dead and the messages it gets will be logged as a dead letter messages.

Btw, even if my assumptions are wrong I really like to know the answer of "how can I use killswitch" in this matter to learn the akka-streams better.

Thank you.

Magnus Andersson

unread,
May 6, 2016, 4:48:12 AM5/6/16
to Akka User List
Hi

You're right, I read the buffer as being the input buffer not the output buffer. As for the kill switch I haven't used it myself so I'm not much help there, I thought the underlying assumption was wrong.

/Magnus

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/OisR-nYVJXA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
Magnus Andersson

Akka Team

unread,
May 10, 2016, 5:25:41 AM5/10/16
to Akka User List
Hi,

I am not sure I fully understand what you try to achieve. Throttle only stores one message, so there will be only one message outstanding when its upstream is closed, It might take some time to emit that, that's true, but this is what it supposed to do.

If you don't want a proper close/complete here (since that waits for any previous element to be properly passed first to the last stage) but a failure. Failures are propagated immediately, independently of any previous message being in buffers/delayed etc.

But a more general question, why are you closing the receiving actor before all the elements has been passed to it? Just because the client closed the connection it does not mean that it does not want the previous messages to be delivered. As an example:

1. I am in room, and type "Bye!"
2. I close the client

I expect "Bye!" to be still delivered

-Endre

You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

hamid

unread,
May 11, 2016, 2:17:45 AM5/11/16
to Akka User List
Hi,

What I thought was if the websocket connection gets closed, then it should be cleaned from anywhere that has its ActorRef. Only because the actor will be dead and it still will receive messages(dead letters) which I guessed, when the system goes under pressure/load then it can lead to performance issues.

But I see your point and it's a good one. Then I guess either I'm not thinking the "Throttling" strategy properly or this is just the way it is.

But I still like to know how it gets done to use a KIllSwitch in my code. It's kinda weird, by builder I add a killswitch and then it's interface does not provide me anything its supposed to do! Unless it's not supposed to do what I think it should :D

Thanks
Reply all
Reply to author
Forward
0 new messages