Hi everyone. I wrote a HTTP client using netty. But it can only send 100 requests despite how many requests are written to the channel. For example if I send 500 requests, an callback print the number on write complete tells that 500 requests are written out, but nginx only get 100 of them and returned 100 responses. Not sure what I missed. This seems to be some configuration I didn't notice or there is a bug in my code I can't see.
2. Run the code against nginx and check program's log, find log line "written 499" means 500 requests are written. Find "200 OK 100" means only 100 respons with http status code 200. But check nginx access_log, only 100 request are received by nginx.
package demo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
public class HttpClient {
private EventLoopGroup group;
private Channel channel;
public static void main(String[] args) throws InterruptedException, IOException {
String host = "127.0.0.1";
int port = 80;
int count = 500;
EventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("http-client-v1"));
HttpClient client = new HttpClient(group);
client.open(host, port);
for (int i = 0; i < count; i++) {
if (!client.isWritable()) {
Thread.sleep(5000);
}
if (client.isWritable()) {
System.out.println("send " + i);
FullHttpRequest getRequest = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.GET,
"/");
getRequest.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN);
getRequest.headers().set(HttpHeaderNames.HOST, host + ":" + "port");
client.write(getRequest);
}
}
System.out.println("done");
System.in.read();
client.close();
System.out.println("close");
}
public HttpClient(EventLoopGroup group) {
this.group = group;
}
public void open(String host, int port) {
Bootstrap b = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_SNDBUF, 128 * 1024 * 1024)
.option(ChannelOption.SO_RCVBUF, 128 * 1024 * 1024)
.option(ChannelOption.SO_REUSEADDR, false)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024 * 1024, 128 * 1024 * 1024))
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new LogHandler())
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(128 * 1024 * 1024))
.addLast(new ClientHandler());
}
});
channel = b.connect().syncUninterruptibly().channel();
System.out.println("connected");
}
public void close() {
try {
channel.closeFuture().syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
}
}
public boolean isWritable() {
return channel.isWritable();
}
public ChannelFuture write(FullHttpRequest request) {
return channel.writeAndFlush(request);
}
private static class ClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
AtomicInteger id = new AtomicInteger();
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
System.out.println(msg.status() + ", " + id.incrementAndGet());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.getCause().printStackTrace();
}
}
private static class LogHandler extends ChannelDuplexHandler {
AtomicInteger id = new AtomicInteger();
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
System.out.println("written " + id.incrementAndGet());
}
}
}));
}
}
}