Reactor Netty with Pipeline Configuration

37 views
Skip to first unread message

Phillip Gibb

unread,
May 17, 2017, 5:00:46 AM5/17/17
to reactor-framework
Has anyone been able to set up a reactor-netty tcp server with a configurable pipeline. It seems that I have to call newHandler that takes a functional interface, a BiFunction handler with NettyInbound and OutBlound. Returning anything from the handler results in a response to the client but nothing goes through the pipeline. The pipeline can be set after creating the TcpServer with afterChannelInit.

EventLoopGroup group = new NioEventLoopGroup(1);
   
try {
       
final TcpServer server = TcpServer.create(opts -> opts.afterChannelInit(clientEndPoint).listen(tcpSocketAddress));
        server
.newHandler((in, out) -> {
           
in.receive().asByteArray().map(bb -> {
               
try {
                   
return m.readValue(bb, Message.class);
               
} catch (IOException io) {
                   
throw Exceptions.propagate(io);
               
}
           
}).log("conn").subscribe(data -> {
               
if ("Hello World!".equals(data.getMessage())) {
                    latch
.countDown();
               
}
           
});
           
return Flux.never();//does not return anything to client or even pass the event to the pipeline
           
//this returns to the client: return out.sendString(Mono.just("Hi")).neverComplete();
       
}).block(Duration.ofSeconds(30));
   
} finally {
       
group.shutdownGracefully().sync();
   
}

I don't want to return anything to the client until it has gone through the pipeline.
My clientEndPoint implements Consumer<Channel> and sets up the pipeline in the accept(Channel sc) method.
The above is structured from Netty-reactor's tcpServerHandlesJsonPojosOverSsl

thanks

Phillip
Reply all
Reply to author
Forward
0 new messages