akka and websockets integration in Java, how to push from server?

593 views
Skip to first unread message

Federico Jakimowicz

unread,
May 9, 2016, 3:06:43 PM5/9/16
to Akka User List
Hi,

I wonder if there is some example in the Java documentation on how to push to the browser from the server using akka websockets integration
In the documentation there not seems to be much about it

Looking at simple examples done with Java alone it could be done by 
session.getAsyncRemote().sendText(message); or
session.getBasicRemote().sendText(message);
 
but how would be the akka approach?

thanks in advance!

Akka Team

unread,
May 12, 2016, 5:02:30 AM5/12/16
to Akka User List
Hi Federico,

Akka HTTP and its Websocket support is built on Akka Streams. You will need to understand streams first, as the API is not simply just "sendWhatever" because everything requires backpressure. The Websocket API requires you to implement a Flow (int the doc page you linked it is created by the "greeter()" method), which is the abstraction representing something which accepts and produces message of some type, in a backpressured way.

I recommend to familiarize yourself with the stream APIs first and then revisit the HTTP parts.

-Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - Reactive apps on the JVM
Blog: letitcrash.com
Twitter: @akkateam

Flavio

unread,
May 12, 2016, 6:29:54 AM5/12/16
to Akka User List

Hello Federico

 

Below my code.


The class WebSocketApp is the Http App. The Method metrics returns the required Flow. This flow forwards all incoming messages to a newly created Actor of type WebSocketCommandSubscriber. The flow gets the messages to send back to the websocket client from newly created Actor of Type WebSocketDataPublisherActor. The Actor WebSocketDataPublisherActor receives the sending events from the Eventstream. If the websocket-stream is not ready to receive data (keyword backpressure) incoming messages from the Eventstream are lost and never send to the websocket client. For each connection this two Actors are created once.

 

With this solution it is NOT possible to implement a request/response handling, because the WebSocketDataPublisherActor is listening on the Eventstream. I have solved this problem too and I will publish the code to my own question later.

 

Have fun!

Flavio


Ps. A starting point to understand steams could be: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754

 

 

public class WebSocketApp extends HttpApp {
 
  private static final Gson gson = new Gson();
 
  @Override
  public Route createRoute() {
    return get(
        path("metrics").route(handleWebSocketMessages(metrics()))
        );
  }
 
  private Flow<Message, Message, ?> metrics() {
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
    Source<Message, ActorRef> metricsSource = 
        Source.actorPublisher(WebSocketDataPublisherActor.props())
        .map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
    return Flow.fromSinkAndSource(metricsSink, metricsSource);
  }
}

 


public class WebSocketCommandSubscriber extends AbstractActorSubscriber {

 

    public static Props props() {

        return Props.create(WebSocketCommandSubscriber.class);

    }

   

    public WebSocketCommandSubscriber() {

        receive(ReceiveBuilder.

            match(ActorSubscriberMessage.OnNext.class, on -> on.element() instanceof Message,

                    onNext -> {

                        Message message = (Message)onNext.element();

                        handleIncomingMessage(message);

           

          // TODO: how do we handle OnComplete? (do we have to / why exception)

//        })

//        .match(akka.stream.actor.ActorSubscriberMessage.OnComplete., (x) -> {

//            context().system().stop(self());

        }).match(Object.class, (x) -> {

            System.out.println("WebSocketCommandSubscriber: Unkown incomming message: " + x.getClass().getName() + ": "  + x);

            unhandled(x);

        }).build());

    }

   

    private void handleIncomingMessage(Message message) {

        WebSocketMessage<?> wsCommand = new Gson().fromJson(message.asTextMessage().getStrictText(), WebSocketMessage.class);

        switch (wsCommand.type) {

        case "DoX":

            getConnectionsActor().tell(XyActor.doX(createRemoteAddress(wsCommand)), ActorRef.noSender());

            break;

        case "DoY":

            getConnectionsActor().tell(XyActor.doY(createRemoteAddress(wsCommand)), ActorRef.noSender());

            break;

        }

    }

    // TODO: we should refactor our messaging concept - it does not work that nice as expected

    private RemoteAddress createRemoteAddress(WebSocketMessage<?> wsCommand) {

        @SuppressWarnings("rawtypes")

        LinkedTreeMap map = (LinkedTreeMap) wsCommand.data;

        return new RemoteAddress(map.get("host").toString(), Double.valueOf(map.get("port").toString()).intValue(), map.get("actorSystemName").toString());

    }

    private NodeName createNodeName(WebSocketMessage<?> wsCommand) {

        @SuppressWarnings("rawtypes")

        LinkedTreeMap map = (LinkedTreeMap) wsCommand.data;

        return new NodeName(map.get("nodeName").toString());

    }

   

    private ActorSelection getConnectionsActor() {

        return context().system().actorSelection(Connections.ACTOR_PATH);

    }

    private ActorSelection getConnectionActor(WebSocketMessage<?> wsCommand) {

        return context().system().actorSelection(NodeConnection.getActorPath(createNodeName(wsCommand)));

    }

 

    @Override

    public RequestStrategy requestStrategy() {

        return new MaxInFlightRequestStrategy(10) {

            @Override

            public int inFlightInternally() {

                // we do not hold any messages yet, but will eventually be

                // required, e.g. for request/response message handling

                return 0;

            }

        };

    }

}

 

 

public class WebSocketDataPublisher extends AbstractActorPublisher<WebSocketMessage<?>> {

 

    // TODO: the WebSocket client should be able to configure its interessted events, by sending a corresponding message

    List<Class<?>> interesstedEvents = Arrays.asList(

            MeasurementDataMessage.class,

            NodeConnectionEvents.class);

 

    public static Props props() {

        return Props.create(WebSocketDataPublisher.class);

    }

 

    @Override

    public void preStart() throws Exception {

        for (Class<?> eventClass : interesstedEvents) {

            // unsubscribing performed automatically by the event stream on actor destroy

            getContext().system().eventStream().subscribe(self(), eventClass);

        }

    }

 

    public WebSocketDataPublisher() {

        UnitPFBuilder<Object> builder = ReceiveBuilder.match(Cancel.class, cancel -> context().stop(self()));

        for (Class<?> clazz : interesstedEvents) {

            builder = builder.match(clazz, message -> {

                handleMessage(message);

            });

        }

        receive(builder.build());

    }

 

    private void handleMessage(Object message) {

        // while the stream is not ready to receive data - incoming messages are lost

        if (isActive() && totalDemand() > 0) {

            WebSocketMessage<?> webSocketMessage = WebSocketMessage.create(message.getClass().getSimpleName(), message);

//            System.out.println("send message to WS: " + message);

            onNext(webSocketMessage);

        } else {

//            System.out.println("LOST message to WS: " + message);

        }

    }

}

 


public class WebSocketMessage<T> {
    public final String type;
    public final T data;
 
    public static WebSocketMessage<Void> create(String type) {
        return create(type, null);
    }
 
    public static <T> WebSocketMessage<T> create(String type, T data) {
        return new WebSocketMessage<>(type, data);
    }
    
    private WebSocketMessage(String type, T data) {
        this.type = type;
        this.data = data;
    }
}

 

Federico Jakimowicz

unread,
May 12, 2016, 11:57:35 AM5/12/16
to Akka User List
Thanks Endre,

I have been taking a look to several of this APIs and to the source code too.
However most of this takes me a considerable amount of time to internalize ( i might be not that smart ), I'm not familiar by anyway yet to stream programming.
Anyway I will continue reading and testing things is nice to have a challenge.

thanks again

Federico Jakimowicz

unread,
May 12, 2016, 12:05:31 PM5/12/16
to Akka User List
Hi Flavio,
Yesterday looking at this example
and the code sample you published in your first post I managed to get what you are describing here.
I mean that the publisher just publishes to the client flow and the subscriber is just subscribed to the client flow and there is no relation between them, so then I understood the issue you posted in the first post. I meant i found not solution but at least i managed to understand what you were at ( which for me was somewhat an achievement hehehe ). Then i had to go to sleep :).
I will read the code you posted today and the solution to your issue if you post it too!

thanks a lot for the in detail explanation!

Fede.

Alok Jha

unread,
Mar 1, 2020, 6:53:22 AM3/1/20
to Akka User List
Hi Federico,

Could you please provide me access to your blog?
I have a similar requirement to consume messages from the event stream and respond to the client (browser) over a WebSocket connection.
I am able to create a WebSocket connection and respond to the client when it asks for information.

I am wondering how to send the message to the client whenever any message pushed to Akka event stream, without the client specifically asking for the message. 
The server should be able to push the message to the client over WebSocket as soon as a message is published to the event stream.

Also, how do I club both the request/response cycle (i,e responding to a client when a client asks for data) and then sending the data to the client when there is anything published to event stream.
Is it possible to club both of these in one Route?
Please help me with this. Also if you could provide me access to your blog that will be great.
I referred below links but didn't get an idea.

Thanks
Alok
Reply all
Reply to author
Forward
0 new messages