Akka Stream getting Termination message just after sucuss message when my neety client connected.

15 views
Skip to first unread message

G J

unread,
Aug 18, 2017, 9:12:51 AM8/18/17
to Akka User List
I'm getting termination message just after sucuss message when my neety client connected. I'm new in akka stream. Please help me what i'm missing in my poc. 

import java.net.InetSocketAddress;
import java.util.concurrent.CompletionStage;

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Tcp;

/**
* Created by gaurav on 28/7/17.
*/
public class Server {
public static void main(String[] args) {
InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000);
final Sink<Tcp.IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
System.out.println("Client connected from: " + conn.remoteAddress());
conn.handleWith(ActorFlow.actorRef(out -> MyWebSocketActor.props(out)), InternalWebSocketHelper.actorMaterializer());
});
final CompletionStage<Tcp.ServerBinding> bindingFuture = Tcp.get(InternalWebSocketHelper.actorSystem())
.bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(InternalWebSocketHelper.actorMaterializer());

bindingFuture.whenComplete((binding, throwable) -> {
System.out.println("Server started, listening on: " + binding.localAddress());
});

bindingFuture.exceptionally(e -> {
System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage());
InternalWebSocketHelper.actorSystem().terminate();
return null;
});

}

private static class MyWebSocketActor extends UntypedActor {

private final ActorRef out;

public MyWebSocketActor(ActorRef out) {
this.out = out;
}

public static Props props(ActorRef out) {
return Props.create(MyWebSocketActor.class, () -> new MyWebSocketActor(out));
}

public void onReceive(Object message) throws Exception {
out.tell(message, ActorRef.noSender());
}
}
}




/**
* Created by gaurav on 1/8/17.
*/

import java.util.function.Function;

import org.reactivestreams.Publisher;

import akka.NotUsed;
import akka.actor.*;
import akka.japi.Pair;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.*;

public class ActorFlow {

public static <In, Out> Flow<In, Out, NotUsed> actorRef(Function<ActorRef, Props> props) {
return actorRef(props, 1000, OverflowStrategy.dropNew(), InternalWebSocketHelper.actorSystem(), InternalWebSocketHelper.actorMaterializer());
}

public static <In, Out> Flow<In, Out, NotUsed> actorRef(Function<ActorRef, Props> props, int bufferSize, OverflowStrategy overflowStrategy,
ActorRefFactory factory, Materializer mat) {

Pair<ActorRef, Publisher<Out>> pair = Source.<Out> actorRef(bufferSize, overflowStrategy)
.toMat(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.both()).run(mat);

return Flow.fromSinkAndSource(
Sink.actorRef(factory.actorOf(Props.create(WebSocketFlowActor.class, () -> new WebSocketFlowActor(props, pair.first()))),
new Status.Success(new Object())),
Source.fromPublisher(pair.second()));
}

private static class WebSocketFlowActor extends UntypedActor {

private final ActorRef flowActor;

public WebSocketFlowActor(Function<ActorRef, Props> props, ActorRef ref) {
flowActor = context().watch(context().actorOf(props.apply(ref), "flowActor"));
}

@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof Status.Success) {
flowActor.tell(PoisonPill.getInstance(), getSelf());
} else if (message instanceof Terminated) {
context().stop(getSelf());
} else {
flowActor.tell(message, getSelf());
}
}

@Override
public SupervisorStrategy supervisorStrategy() {
return SupervisorStrategy.stoppingStrategy();
}
}
}



import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;

public class InternalWebSocketHelper {

static ActorSystem actorSystem = ActorSystem.create();
static ActorMaterializer actorMaterializer = ActorMaterializer.create(actorSystem);

static ActorSystem actorSystem() {
return actorSystem;
}

static ActorMaterializer actorMaterializer() {
return actorMaterializer;
}
}
Reply all
Reply to author
Forward
0 new messages