Hi,
I was following the proxy example to understand the mechanism to deal with backpressure. The high-level idea is to set autoread as false, and call frontend ctx.channel.read() when needed. Generally the chain will be inboundChannel.read() -> outboundChannel.write() -> inboundChannel.read() -> outboundChannel.write() -> ...
However, I found one issue in the current example (specifically, in HexDumpProxyFrontendHandler):
Step 0: the frontend channel read some content, which triggers channelRead() to call outboundChannel.writeAndFlush();
Step 1: once the writeAndFlush() is successful, the registered listener will again call inboundChannel's ctx.channel.read();
Step 2: If for some reason the inboundChannel has nothing to read (maybe data just arrive too slowly), the inboundChannel's channelRead() will not be triggered. Also, since autoread is set to false, inboundChannel will not be read later. So, the chain will be broken.
If this analysis is correct, then there potentially could be some bug in the netty's proxy example.
Below is the code for the FrontendHandler.
Thanks,
Ming
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private volatile Channel outboundChannel;
public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
/**
* Closes the specified channel after all queued write requests are flushed.
*/
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}