public class Updater extends UntypedActor
{
private final ActorRef out;
public Updater(ActorRef out)
{
this.out = out;
}
public static Props props()
{
return Props.create(Updater.class);
}
public void onReceive(Object message) throws Throwable
{
System.out.println("Message: " + message);
if (message instanceof String)
{
out.tell("I receive the message: " + message, self());
}
}
}
public class RemoteController extends Controller
{
@Inject
private ActorSystem actorSystem;
@Inject
private Materializer materializer;
public WebSocket socket()
{
return WebSocket.Json.acceptOrResult(request ->
{
CompletionStage<Flow<JsonNode, JsonNode, NotUsed>> future = wsFutureFlow(request);
CompletionStage<Either<Result, Flow<JsonNode, JsonNode, ?>>> stage = future.thenApplyAsync(Either::Right);
return stage.exceptionally(this::logException);
});
}
public CompletionStage<Flow<JsonNode, JsonNode, NotUsed>> wsFutureFlow(Http.RequestHeader request)
{
// Create an actor ref source and associated publisher for sink
Pair<ActorRef, Publisher<JsonNode>> pair = createWebSocketConnections();
ActorRef webSocketOut = pair.first();
Publisher<JsonNode> webSocketIn = pair.second();
// Create an updater actor and attach it to the source
CompletionStage<ActorRef> updaterActorFuture = createUpdaterActor(webSocketOut);
// Once we have an actor available, create a flow...
return updaterActorFuture.thenApplyAsync(updaterActor -> createWebSocketFlow(webSocketIn, updaterActor));
}
public Pair<ActorRef, Publisher<JsonNode>> createWebSocketConnections()
{
// Creates a source to be materialized as an actor reference.
// Creating a source can be done through various means, but here we want
// the source exposed as an actor so we can send it messages from other
// actors.
Source<JsonNode, ActorRef> source = Source.actorRef(10, OverflowStrategy.dropTail());
// Creates a sink to be materialized as a publisher. Fanout is false as we only want
// a single subscriber here.
Sink<JsonNode, Publisher<JsonNode>> sink = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT);
// Connect the source and sink into a flow, telling it to keep the materialized values,
// and then kicks the flow into existence.
return source.toMat(sink, Keep.both()).run(materializer);
}
public CompletionStage<ActorRef> createUpdaterActor(ActorRef webSocketOut)
{
// Use guice assisted injection to instantiate and configure the child actor.
long timeoutMillis = 100L;
return FutureConverters.toJava(
ask(actorSystem.actorOf(Updater.props()), new Updater(webSocketOut), timeoutMillis)
).thenApply(stageObj -> (ActorRef) stageObj);
}
public Flow<JsonNode, JsonNode, NotUsed> createWebSocketFlow(Publisher<JsonNode> webSocketIn, ActorRef updaterActor)
{
// http://doc.akka.io/docs/akka/current/scala/stream/stream-flows-and-basics.html#stream-materialization
// http://doc.akka.io/docs/akka/current/scala/stream/stream-integrations.html#integrating-with-actors
// source is what comes in: browser ws events -> play -> publisher -> userActor
// sink is what comes out: userActor -> websocketOut -> play -> browser ws events
Sink<JsonNode, NotUsed> sink = Sink.actorRef(updaterActor, new Status.Success("success"));
Source<JsonNode, NotUsed> source = Source.fromPublisher(webSocketIn);
Flow<JsonNode, JsonNode, NotUsed> flow = Flow.fromSinkAndSource(sink, source);
// Unhook the user actor when the websocket flow terminates
// http://doc.akka.io/docs/akka/current/scala/stream/stages-overview.html#watchTermination
return flow.watchTermination((ignore, termination) ->
{
termination.whenComplete((done, throwable) ->
{
actorSystem.stop(updaterActor);
});
return NotUsed.getInstance();
});
}
public Either<Result, Flow<JsonNode, JsonNode, ?>> logException(Throwable throwable)
{
// https://docs.oracle.com/javase/tutorial/java/generics/capture.html
Result result = Results.internalServerError("error");
return Either.Left(result);
}
}
import static akka.pattern.Patterns.ask;
import websocket
import thread
import time
def on_message(ws, message):
print(message)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def run(*args):
for i in range(3):
time.sleep(1)
ws.send("Hello %d" % i)
time.sleep(1)
ws.close()
print("thread terminating...")
thread.start_new_thread(run, ())
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://localhost:9000/ws",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
public WebSocket socket() {
return WebSocket.Text.accept(request -> {
// Log events to the console
Sink<String, ?> in = Sink.foreach(System.out::println);
// Send a single 'Hello!' message and then leave the socket open
Source<String, ?> out = Source.single("Hello!").concat(Source.maybe());
return Flow.fromSinkAndSource(in, out);
});
}
With the Actor I get "Handshake status 500". Maybe the Actor is not the best approach but I cannot figure out how to fit my needs with the Akka streams. Is it possible to define the in and out variable as members to be able to send messages via an other method? Message reception shouldn't be separated from message sending?