Netty based HttpClient can only send 100 requests

438 views
Skip to first unread message

Cheng Ju

unread,
Nov 28, 2016, 9:04:52 AM11/28/16
to ne...@googlegroups.com
Hi everyone.  I wrote a HTTP client using netty.  But it can only send 100 requests despite how many requests are written to the channel.  For example if I send 500 requests, an callback print the number on write complete tells that 500 requests are written out, but nginx only get 100 of them and returned 100 responses.  Not sure what I missed.  This seems to be some configuration I didn't notice or there is a bug in my code I can't see. 


My JDK and OS:
Apache Maven 3.3.9 
Java version: 1.8.0_112, vendor: Oracle Corporation
OS name: "windows 7", version: "6.1", arch: "amd64", family: "dos"
Netty: 4.1.6.Final

How to test:
1. Run ab -c10 -n10000 http://localhost/ proves nginx is fine.
2. Run the code against nginx and check program's log, find log line "written 499" means 500 requests are written.  Find "200 OK 100" means only 100 respons with http status code 200.  But check nginx access_log, only 100 request are received by nginx.

Here is my code:

package demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

public class HttpClient {
    private EventLoopGroup group;
    private Channel channel;

    public static void main(String[] args) throws InterruptedException, IOException {
        String host = "127.0.0.1";
        int port = 80;
        int count = 500;

        EventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("http-client-v1"));
        HttpClient client = new HttpClient(group);
        client.open(host, port);
        for (int i = 0; i < count; i++) {
            if (!client.isWritable()) {
                Thread.sleep(5000);
            }

            if (client.isWritable()) {
                System.out.println("send " + i);

                FullHttpRequest getRequest = new DefaultFullHttpRequest(
                        HttpVersion.HTTP_1_1,
                        HttpMethod.GET,
                        "/");
                getRequest.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN);
                getRequest.headers().set(HttpHeaderNames.HOST, host + ":" + "port");
                client.write(getRequest);
            }
        }

        System.out.println("done");
        System.in.read();
        client.close();
        System.out.println("close");
    }

    public HttpClient(EventLoopGroup group) {
        this.group = group;
    }

    public void open(String host, int port) {
        Bootstrap b = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_SNDBUF, 128 * 1024 * 1024)
                .option(ChannelOption.SO_RCVBUF, 128 * 1024 * 1024)
                .option(ChannelOption.SO_REUSEADDR, false)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(32 * 1024 * 1024, 128 * 1024 * 1024))
                .remoteAddress(host, port)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast(new LogHandler())
                                .addLast(new HttpClientCodec())
                                .addLast(new HttpObjectAggregator(128 * 1024 * 1024))
                                .addLast(new ClientHandler());
                    }
                });
        channel = b.connect().syncUninterruptibly().channel();
        System.out.println("connected");
    }

    public void close() {
        try {
            channel.closeFuture().syncUninterruptibly();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean isWritable() {
        return channel.isWritable();
    }

    public ChannelFuture write(FullHttpRequest request) {
        return channel.writeAndFlush(request);
    }

    private static class ClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
        AtomicInteger id = new AtomicInteger();

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
            System.out.println(msg.status() + ", " + id.incrementAndGet());
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.getCause().printStackTrace();
        }
    }

    private static class LogHandler extends ChannelDuplexHandler {
        AtomicInteger id = new AtomicInteger();

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            super.write(ctx, msg, promise.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("written " + id.incrementAndGet());
                    }
                }
            }));
        }
    }
}




Norman Maurer

unread,
Nov 28, 2016, 9:06:36 AM11/28/16
to ne...@googlegroups.com
You should check if there are any write errors as at the moment you only check if the write was successfully or not.


--
You received this message because you are subscribed to the Google Groups "Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email to netty+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/netty/CAAmVRwaoZ173RUUh15Pi6qyJFgfdFbWA2_cwg1ZwJdAm3tdjaw%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Tim Boudreau

unread,
Feb 14, 2017, 1:57:18 AM2/14/17
to Netty discussions, wolv...@gmail.com

Agree with Norman, check future.cause() - if something is going wrong, the information is there but you're not checking it.

But there's a simpler explanation.  In your loop you do this:

if (!client.isWritable()) {
  Thread.sleep(5000);
}

if (client.isWritable()) {
   System.out.println("send " + i);
   //...send the request
}

If client.isWritable() is false on the second test, you never send the request.  So most likely your own code is skipping sending it.  There's nothing particularly special about 5 seconds - a lot of time for the computer, but you're also banging very hard on everything in your OS between your code and the network card.

You also might have limit_conn somewhere in your NginX config, which tells it to refuse more than n connections from any one address.  I don't remember if it's in the out-of-the-box NginX config, but it's a very common defense against simple denial of service attacks that use a few hosts to tie up a lot of connections.

There's a better pattern for looping in a world of async callbacks - and it will get you out of your isWritable() tests completely.  Roughly this:

class RepeatedSender implements ChannelFutureListener {
  final int max;
  int requestId;
  RepeatedSender(int max) {  this.max = max; }
  
  public void operationComplete(ChannelFuture future) {
     if (future.cause()) {
        future.cause().printStackTrace();
        return; // something is wrong, so quit.
     }
     if (++requestId < max) {
        sendRequest();
     }
  }

  void sendRequest () {
    HttpRequest req = // create the request...
    client.write(req).addListener(this);
  }
}

new RepeatedSender(500).sendRequest();

Now, what this does is chain up the requests - so each one is not sent until the previous one has been flushed to the socket.  So, no loops and no writability tests because you are only called when the channel is not busy.

If you wanted to, say, parallelize this to send more concurrent requests, you'd just create, say, an EventLoopGroup with 5 threads and create 5 RepeatedSender(100) and call start on each one.

In other words, usually when you want to write a loop in async, callback based code, you're not going to use a for- or while-loop.  Instead, you're going to write something that gets called back when one batch of work is finished, and it then dispatches the next batch work until it's done.  It feels more like writing recursion than looping.

-Tim

Reply all
Reply to author
Forward
0 new messages