I'm getting termination message just after sucuss message when my neety client connected. I'm new in akka stream. Please help me what i'm missing in my poc.
import java.net.InetSocketAddress;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Tcp;
/**
* Created by gaurav on 28/7/17.
*/
public class Server {
public static void main(String[] args) {
InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000);
final Sink<Tcp.IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
System.out.println("Client connected from: " + conn.remoteAddress());
conn.handleWith(ActorFlow.actorRef(out -> MyWebSocketActor.props(out)), InternalWebSocketHelper.actorMaterializer());
});
final CompletionStage<Tcp.ServerBinding> bindingFuture = Tcp.get(InternalWebSocketHelper.actorSystem())
.bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(InternalWebSocketHelper.actorMaterializer());
bindingFuture.whenComplete((binding, throwable) -> {
System.out.println("Server started, listening on: " + binding.localAddress());
});
bindingFuture.exceptionally(e -> {
System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage());
InternalWebSocketHelper.actorSystem().terminate();
return null;
});
}
private static class MyWebSocketActor extends UntypedActor {
private final ActorRef out;
public MyWebSocketActor(ActorRef out) {
this.out = out;
}
public static Props props(ActorRef out) {
return Props.create(MyWebSocketActor.class, () -> new MyWebSocketActor(out));
}
public void onReceive(Object message) throws Exception {
out.tell(message, ActorRef.noSender());
}
}
}
/**
* Created by gaurav on 1/8/17.
*/
import java.util.function.Function;
import org.reactivestreams.Publisher;
import akka.NotUsed;
import akka.actor.*;
import akka.japi.Pair;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.*;
public class ActorFlow {
public static <In, Out> Flow<In, Out, NotUsed> actorRef(Function<ActorRef, Props> props) {
return actorRef(props, 1000, OverflowStrategy.dropNew(), InternalWebSocketHelper.actorSystem(), InternalWebSocketHelper.actorMaterializer());
}
public static <In, Out> Flow<In, Out, NotUsed> actorRef(Function<ActorRef, Props> props, int bufferSize, OverflowStrategy overflowStrategy,
ActorRefFactory factory, Materializer mat) {
Pair<ActorRef, Publisher<Out>> pair = Source.<Out> actorRef(bufferSize, overflowStrategy)
.toMat(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.both()).run(mat);
return Flow.fromSinkAndSource(
Sink.actorRef(factory.actorOf(Props.create(WebSocketFlowActor.class, () -> new WebSocketFlowActor(props, pair.first()))),
new Status.Success(new Object())),
Source.fromPublisher(pair.second()));
}
private static class WebSocketFlowActor extends UntypedActor {
private final ActorRef flowActor;
public WebSocketFlowActor(Function<ActorRef, Props> props, ActorRef ref) {
flowActor = context().watch(context().actorOf(props.apply(ref), "flowActor"));
}
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof Status.Success) {
flowActor.tell(PoisonPill.getInstance(), getSelf());
} else if (message instanceof Terminated) {
context().stop(getSelf());
} else {
flowActor.tell(message, getSelf());
}
}
@Override
public SupervisorStrategy supervisorStrategy() {
return SupervisorStrategy.stoppingStrategy();
}
}
}
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
public class InternalWebSocketHelper {
static ActorSystem actorSystem = ActorSystem.create();
static ActorMaterializer actorMaterializer = ActorMaterializer.create(actorSystem);
static ActorSystem actorSystem() {
return actorSystem;
}
static ActorMaterializer actorMaterializer() {
return actorMaterializer;
}
}