Problem detecting Websocket failure

266 views
Skip to first unread message

Alexej Haak

unread,
Sep 28, 2016, 9:23:36 AM9/28/16
to Akka User List
Hi,
I have a problem detecting a failing websocket. I already got some help inside the akka issue tracker, so thanks for getting me started there even though it was the wrong place :

 val source = Source.actorRef[Message](bufferSize, OverflowStrategy.fail)

 val flow = Flow
    .fromSinkAndSourceMat(sink, source)(Keep.right)
    .alsoTo(Sink.onComplete(_ => {  // Not working as expected?
      println("Restarting due to Websocket failure..")
      println("------------------------------------------------- \n\n\n\n")
      restart()
    }))

 val (upgradeResponse, actor) =
    Http().singleWebSocketRequest(
      WebSocketRequest(
        host + "/console"),
      flow)

The alsoTo part never triggers, even though the websocket connection closes. I'm Testing it in this setup:

- Play Framework Server with websocket running
- Client Connects
- Messaging works
- Play Framework Server gets restarted
- Messaging fails and no Client connects
- Restarting Client manually
- Messaging works again


Best regards :)

Alexej Haak

unread,
Sep 29, 2016, 9:39:38 AM9/29/16
to Akka User List
source
.viaMat(
Http().webSocketClientFlow(
WebSocketRequest(
host + "/console")
)
)(Keep.both)
.alsoTo(Sink.onComplete(_ => {
self ! RestartWebsocket
}))
.toMat(sink)(Keep.left)
.run()

Brandon Bradley

unread,
Feb 10, 2017, 9:45:08 AM2/10/17
to Akka User List
Hello!

I also can't get flow.alsoTo(Sink.onComplete(...)) to trigger. The only thing that completes when the websocket closes (due to timeout) is the materialized Future[Done] from the Flow.

In addition, the pattern to use for reconnection logic becomes unclear because the websocket flow is not reusable.

Here is my own logic for connecting to a websocket. If I add websocketFlow.alsoTo(Sink.onComplete(...)), it does not trigger, but closed.foreach(_ => throw new Exception("Websocket disconnected.")) does.

Cheers!
Brandon Bradley

johannes...@lightbend.com

unread,
Feb 21, 2017, 5:21:53 AM2/21/17
to Akka User List
Hi,

you are applying the `alsoTo` clause to the wrong side of the connection. You need to put it on the Sink-side. The way you have written it, the `alsoTo` clause waits for your `Source.actorRef` to close the connection.

Try using

singleWebSocketRequest(..., Flow[Message].alsoTo(Sink.onComplete(...)).via(yourHandlerFlow))

Johannes
Reply all
Reply to author
Forward
0 new messages