Akka websocket server-push model (server sends messages to client)

1,551 views
Skip to first unread message

Andrew Schenck

unread,
Apr 21, 2017, 1:15:33 PM4/21/17
to Akka User List
Hello all,

I have scoured the questions raised here that are similar to my needs but so far have not been successful at finding a solution.

I need (want) to be able to create an akka websocket server in which I can send messages from the server to the clients connected (for example, send the outputs from a flink analytic along the websocket to an end user GUI). I was able to implement the request-response example, but so far have had little success in a 'server push' model.

Using something like jetty it is as easy as ws.broadcast, but I understand akka needs to use backpressure etc, soI guess what I am stumbling with is how to create a proper flow in a 'server-push' model, in java.

If there are examples I can look at (haven't found any) or some literature I can read or any suggestions as to how to create this 'server-push' websocket model it would be greatly appreciated.

Thanks for the help,

Andy

Konrad Malawski

unread,
Apr 21, 2017, 1:21:19 PM4/21/17
to akka...@googlegroups.com, Andrew Schenck
Hi Andy,
further details on how (where to which client) you want to push would be helpful for my understanding (and future development).

We have developed an example app recently that does such a thing, it registers "drones" in an actor that keeps track of them by ID, and allows the server at-will to push commands to the drones into the websocket connection such drone has opened.

It does a bit more so (handles resumption and clustering) it's more complex, but you could have a look at it for now: 

It is in Scala but the code would be pretty much the same in Java.
I think this may point you in the direction you might want to take this.

Hope this helps

-- 
Konrad `ktoso` Malawski
Akka @ Lightbend
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Andrew Schenck

unread,
Apr 21, 2017, 2:01:45 PM4/21/17
to Akka User List, andrew...@gmail.com
Thank you for the quick reply! This is indeed an interesting example (albeit a bit complex for my need).

For the simplest case all I would want to do is create a websocket server which can at-will push json messages to a (single) connected client (which would then display on its end, so the client side will accept json and display a graph from the json). The client side is all javascript and written already (borrowed from previous work, it can connect to a websocket and send and receive messages along it). I wanted to replace my current jetty implementation with akka websockets since so much of the rest of my app uses reactive akka (plus to learn how to do it for future projects etc).

So basically I just want to be able to issue a command to the client from the server.

I will look more at the example you shared and try and unpack it (scala is not my strongest language ;D)

Andy

Andrew Schenck

unread,
Apr 21, 2017, 3:42:05 PM4/21/17
to Akka User List
To maybe further elaborate on what I've attempted...

I read a response that the gist of being able to 'server push' a message to a client was to first "do an ActorPublisher to push the msg to client, then create a flow which uses the ActorPublisher as a source and pass this flow to the handleWebSocketMessages. Then, to push messages to the client you can use the ActorRef of the ActorPublisher and do a .tell(msg)". I've tried implementing this in such that I have followed what was described as best as I could, but issue arise immediately (such as no way to get the ActorRef to do a .tell). It's possible I am not creating the ActorPublisher correctly (when they say to "push the msg to the client" I'm assuming that's an onNext call but not sure).

This ActorRef.tell(msg) sounds pretty much like what I need, a very basic way to get the result from my analysis and send it along the websocket to the client.

Andrew Schenck

unread,
Apr 21, 2017, 5:14:13 PM4/21/17
to Akka User List
So I am not sure but I think this is doing what I wanted to accomplish, the flow is

public Flow<Message, Message, NotUsed> createWebSocketFlow() {
       
        pub = system.actorOf(Props.create(AbsPublisher.class));
       
        System.out.println("out actor ref: " + pub.path().name());
       
        // response
        Source<Message, NotUsed> source =
                Source.<Outgoing>actorRef(5, OverflowStrategy.fail())
                        .map((outgoing) -> (Message) TextMessage.create(outgoing.message))
                        .<NotUsed>mapMaterializedValue(destinationRef -> {
                            pub.tell(new OutgoingDestination(destinationRef), ActorRef.noSender());
                    return NotUsed.getInstance();
        });
        // request
        Sink<Message, NotUsed> sink =
                Flow.<Message>create()
                    .map((msg) -> new Incoming(msg.asTextMessage().getStrictText()))
                    .to(Sink.actorRef(pub, PoisonPill.getInstance()));
       
        return Flow.fromSinkAndSource(sink, source);
    }

where Incoming and Outgoing are just simple classes that contain strings, and outgoingdestination contains an actorref
AbsPublisher is an AbstractActorPublisher with a receive that handles the outgoingdestination setup and message passing (via .tell to the destination)

so I can call in my main class:
     system.eventStream().publish(new Incoming("test"), PubRef);

and this will send the message from the server to the client (in my tests I generate a message from the main of the websocketapp class and it shows up on the client side like I want). So I should be able to pack this message with my json and go from there. I copied most of this code from an answer here by Johan Andren.

This might not be the best solution but it seems to do what I need. If anyone else has some suggestions I am still all ears, the flow aspect is still hard for me to wrap my head around.

Andrew Schenck

unread,
Apr 21, 2017, 5:38:11 PM4/21/17
to Akka User List
I also found out I can simply do PubRef.tell(msg) and it will send the message to the client. So this is pretty much what that one post described. Just wanted to make sure if anyone came across this issue they had all the information I've found.


Julian Howarth

unread,
Apr 21, 2017, 7:09:19 PM4/21/17
to Akka User List
I may have misunderstood what you want to achieve, but you don't have to use actors if you'd prefer to just use akka-http / akka-streams. As long as you can provide the data you want to broadcast in the form of an akka-streams Source, it is straightforward to connect that to websocket clients via a broadcast hub: http://doc.akka.io/docs/akka/current/java/stream/stream-dynamic.html#Using_the_BroadcastHub

Something like the following works - in Scala, but Java code will be similar:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, RunnableGraph, Sink, Source}

import scala.collection.immutable.Seq
import scala.concurrent.duration._

object Tester extends App {

implicit val system = ActorSystem("Server")
implicit val mat = ActorMaterializer()

// The source to broadcast (just ints for simplicity)
private val dataSource = Source(1 to 1000).throttle(1, 1.second, 1, ThrottleMode.Shaping).map(_.toString)

// Go via BroadcastHub to allow multiple clients to connect
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
dataSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)

val producer: Source[String, NotUsed] = runnableGraph.run()

// Optional - add sink to avoid backpressuring the original flow when no clients are attached
producer.runWith(Sink.ignore)

private val wsHandler: Flow[Message, Message, NotUsed] =
Flow[Message]
.mapConcat(_ ⇒ Seq.empty[String]) // Ignore any data sent from the client
.merge(producer) // Stream the data we want to the client
.map(l => TextMessage(l.toString))

val route =
path("ws") {
handleWebSocketMessages(wsHandler)
}

val port = 8080

println("Starting up route")
Http().bindAndHandle(route, "0.0.0.0", port)
println(s"Started HTTP server on port $port")

}

If you run the following and connect to ws://localhost:8080/ws you'll see the Ints being output. If you connect a second client, it will also output the same values as the original starting from where the source was up to at the point of connection.

HTH,

Julian

workingdog

unread,
Apr 21, 2017, 7:26:23 PM4/21/17
to Akka User List
Hi Andy,

I made a server (Play/Akka/websocket) that receives and sends json messages to clients at:


This may give you some ideas for publish/subscribe using the actor model.

Johannes Rudolph

unread,
Apr 24, 2017, 5:12:58 AM4/24/17
to Akka User List
Thanks Julian for sharing the example. Indeed, using the BroadcastHub is the recommended way to implement something like this.

Andrew Schenck

unread,
Apr 24, 2017, 11:08:31 AM4/24/17
to Akka User List
Wow that's a pretty neat solution thanks!

 It does achieve what I want, now I wonder could the source be a kafka source?

So whenever something is writing to a kafka topic it could be sent to the WS...

Julian Howarth

unread,
Apr 24, 2017, 11:44:37 AM4/24/17
to Akka User List
https://github.com/akka/reactive-kafka makes it pretty easy to use Kafka as a Source and then it will get broadcast to all connected clients
Reply all
Reply to author
Forward
Message has been deleted
0 new messages