How to use WebSockets?

409 views
Skip to first unread message

tech...@eeproperty.ch

unread,
Aug 22, 2017, 8:48:58 AM8/22/17
to Play Framework
Hi,

I would like to send UNIX commands to a Python script and get the output through Websocket. I read the documentation about WS for Play 2.5 and noticed that the LegacyWebSocket class is deprecated. What's the alternative for my purpose? I tried the example provided in the documentation of Play 2.6 but the Receive class doesn't exist.

Thank you.

tech...@eeproperty.ch

unread,
Aug 24, 2017, 5:09:37 AM8/24/17
to Play Framework
To give more precisions, I use the Java API and I tried the Actor based implementation.

Florian Pradines

unread,
Aug 24, 2017, 6:03:26 AM8/24/17
to Play Framework
Hi,

Here is a working example to implement websockets with Java Play Framework : https://github.com/playframework/play-java-websocket-example

Hope this will help

tech...@eeproperty.ch

unread,
Aug 24, 2017, 9:07:13 AM8/24/17
to Play Framework
OK but this looks very very complicated. What are all those CompletionStage<> instances for? Is there any documentation about it?

Florian Pradines

unread,
Aug 24, 2017, 10:11:24 AM8/24/17
to Play Framework
At the beginning it seems very hard, but in the end it is not at all. CompletionStage is for asynchronous tasks. Play Framework is built to be non blocking.
Here is the doc (where all of this is well explained) :
For 2.6.x : https://www.playframework.com/documentation/2.6.x/JavaAsync

Keep in mind that the provided documentation is very complete, so don't hesitate to look at it. You should read some parts to understand how Play works.

tech...@eeproperty.ch

unread,
Aug 24, 2017, 12:02:15 PM8/24/17
to Play Framework
The documentation above make it clearer. I still use Play 2.5 so I read the related page.

I tried to use the example for my purpose. Here is my Updater actor :

public class Updater extends UntypedActor
{
   
private final ActorRef out;
   
   
public Updater(ActorRef out)
   
{
       
this.out = out;
   
}

   
public static Props props()
   
{
       
return Props.create(Updater.class);
   
}
   
   
public void onReceive(Object message) throws Throwable
   
{
       
System.out.println("Message: " + message);
       
       
if (message instanceof String)
       
{
           
out.tell("I receive the message: " + message, self());
       
}
   
}
}

It does nothing special.

Here is my controller :

public class RemoteController extends Controller
{
   
@Inject
   
private ActorSystem actorSystem;
   
   
@Inject
   
private Materializer materializer;
   
   
public WebSocket socket()
   
{
       
return WebSocket.Json.acceptOrResult(request ->
       
{
           
CompletionStage<Flow<JsonNode, JsonNode, NotUsed>> future = wsFutureFlow(request);
           
CompletionStage<Either<Result, Flow<JsonNode, JsonNode, ?>>> stage = future.thenApplyAsync(Either::Right);
           
return stage.exceptionally(this::logException);
       
});
   
}
   
   
public CompletionStage<Flow<JsonNode, JsonNode, NotUsed>> wsFutureFlow(Http.RequestHeader request)
   
{
       
// Create an actor ref source and associated publisher for sink
       
Pair<ActorRef, Publisher<JsonNode>> pair = createWebSocketConnections();
       
ActorRef webSocketOut = pair.first();
       
Publisher<JsonNode> webSocketIn = pair.second();

       
// Create an updater actor and attach it to the source
       
CompletionStage<ActorRef> updaterActorFuture = createUpdaterActor(webSocketOut);

       
// Once we have an actor available, create a flow...
       
return updaterActorFuture.thenApplyAsync(updaterActor -> createWebSocketFlow(webSocketIn, updaterActor));
   
}
   
   
public Pair<ActorRef, Publisher<JsonNode>> createWebSocketConnections()
   
{
       
// Creates a source to be materialized as an actor reference.

       
// Creating a source can be done through various means, but here we want
       
// the source exposed as an actor so we can send it messages from other
       
// actors.
       
Source<JsonNode, ActorRef> source = Source.actorRef(10, OverflowStrategy.dropTail());

       
// Creates a sink to be materialized as a publisher.  Fanout is false as we only want
       
// a single subscriber here.
       
Sink<JsonNode, Publisher<JsonNode>> sink = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT);

       
// Connect the source and sink into a flow, telling it to keep the materialized values,
       
// and then kicks the flow into existence.
       
return source.toMat(sink, Keep.both()).run(materializer);
   
}
   
   
public CompletionStage<ActorRef> createUpdaterActor(ActorRef webSocketOut)
   
{
       
// Use guice assisted injection to instantiate and configure the child actor.
       
long timeoutMillis = 100L;
       
return FutureConverters.toJava(
                ask
(actorSystem.actorOf(Updater.props()), new Updater(webSocketOut), timeoutMillis)
       
).thenApply(stageObj -> (ActorRef) stageObj);
   
}
   
   
public Flow<JsonNode, JsonNode, NotUsed> createWebSocketFlow(Publisher<JsonNode> webSocketIn, ActorRef updaterActor)
   
{
       
// http://doc.akka.io/docs/akka/current/scala/stream/stream-flows-and-basics.html#stream-materialization
       
// http://doc.akka.io/docs/akka/current/scala/stream/stream-integrations.html#integrating-with-actors

       
// source is what comes in: browser ws events -> play -> publisher -> userActor
       
// sink is what comes out:  userActor -> websocketOut -> play -> browser ws events
       
Sink<JsonNode, NotUsed> sink = Sink.actorRef(updaterActor, new Status.Success("success"));
       
Source<JsonNode, NotUsed> source = Source.fromPublisher(webSocketIn);
       
Flow<JsonNode, JsonNode, NotUsed> flow = Flow.fromSinkAndSource(sink, source);

       
// Unhook the user actor when the websocket flow terminates
       
// http://doc.akka.io/docs/akka/current/scala/stream/stages-overview.html#watchTermination
       
return flow.watchTermination((ignore, termination) ->
       
{
            termination
.whenComplete((done, throwable) ->
           
{
                actorSystem
.stop(updaterActor);
           
});

           
return NotUsed.getInstance();
       
});
   
}
   
   
public Either<Result, Flow<JsonNode, JsonNode, ?>> logException(Throwable throwable)
   
{
       
// https://docs.oracle.com/javase/tutorial/java/generics/capture.html
       
Result result = Results.internalServerError("error");
       
return Either.Left(result);
   
}
}

I'm not able to say if all of those +100 lines are really necessary at this point.

The ask() method isn't defined so it doesn't compile. Where is it defined?

tech...@eeproperty.ch

unread,
Aug 24, 2017, 12:27:06 PM8/24/17
to Play Framework
I forgot to add a static import:

import static akka.pattern.Patterns.ask;

Now I'm able to compile. I tried to connect with a simple Python script:

import websocket
import thread
import time

def on_message(ws, message):
   
print(message)

def on_error(ws, error):
   
print(error)

def on_close(ws):
   
print("### closed ###")

def on_open(ws):
   
def run(*args):
       
for i in range(3):
            time
.sleep(1)
            ws
.send("Hello %d" % i)
        time
.sleep(1)
        ws
.close()
       
print("thread terminating...")
    thread
.start_new_thread(run, ())

if __name__ == "__main__":
    websocket
.enableTrace(True)
    ws
= websocket.WebSocketApp("ws://localhost:9000/ws",
                                on_message
= on_message,
                                on_error
= on_error,
                                on_close
= on_close)
    ws
.on_open = on_open
    ws
.run_forever()

It used to work with this simple server side implementation :

public WebSocket socket() {
 
return WebSocket.Text.accept(request -> {
 
// Log events to the console
 
Sink<String, ?> in = Sink.foreach(System.out::println);

 
// Send a single 'Hello!' message and then leave the socket open
 
Source<String, ?> out = Source.single("Hello!").concat(Source.maybe());

 
return Flow.fromSinkAndSource(in, out);
 
});
}

With the Actor I get "Handshake status 500". Maybe the Actor is not the best approach but I cannot figure out how to fit my needs with the Akka streams. Is it possible to define the in and out variable as members to be able to send messages via an other method? Message reception shouldn't be separated from message sending?
Reply all
Reply to author
Forward
0 new messages