Backpressure and potential issue in the proxy example of netty.

557 views
Skip to first unread message

Ming

unread,
Jul 20, 2016, 2:55:09 PM7/20/16
to Netty discussions
Hi,

I was following the proxy example to understand the mechanism to deal with backpressure. The high-level idea is to set autoread as false, and call frontend ctx.channel.read() when needed. Generally the chain will be inboundChannel.read() -> outboundChannel.write() -> inboundChannel.read() -> outboundChannel.write() -> ...

However, I found one issue in the current example (specifically, in HexDumpProxyFrontendHandler):
Step 0: the frontend channel read some content, which triggers channelRead() to call outboundChannel.writeAndFlush();
Step 1: once the writeAndFlush() is successful, the registered listener will again call inboundChannel's ctx.channel.read();
Step 2: If for some reason the inboundChannel has nothing to read (maybe data just arrive too slowly), the inboundChannel's channelRead() will not be triggered. Also, since autoread is set to false, inboundChannel will not be read later. So, the chain will be broken.

If this analysis is correct, then there potentially could be some bug in the netty's proxy example.

Below is the code for the FrontendHandler.

Thanks,
Ming

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {

private final String remoteHost;
private final int remotePort;

private volatile Channel outboundChannel;

public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();

// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}

/**
* Closes the specified channel after all queued write requests are flushed.
*/
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}

Norman Maurer

unread,
Jul 20, 2016, 3:45:32 PM7/20/16
to ne...@googlegroups.com
No this is wrong… Channel.read() will only tell that we are interested to read and so if nothing is there to read yet it will just do it once there is something.

--
You received this message because you are subscribed to the Google Groups "Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email to netty+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/netty/6f730bc6-3910-41ee-8277-a4eadd168b1a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages