Memory leaks when trying to get Netty server publish messages to Kafka cluster.

1,331 views
Skip to first unread message

dhawan.g...@datavisor.com

unread,
Jul 5, 2017, 6:52:08 PM7/5/17
to Netty discussions
I have a very bare bones version of netty and kafka using netty4 and kafka-clients_0.10.1.1. 
I am hitting memory leak issues when I load testing my server. 


Here's how I run the Netty server. 
java -Dio.netty.recycler.maxCapacity=0 -Dio.netty.leakDetectionLevel=advanced -Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG
-Dbootstrap.servers=localhost:9092 -Dtopic=test_topic -jar target/netty4-httpserver-0.0.1-jar-with-dependencies.jar com.netty.httpserver.HttpServerNetty

When I add the advanced leak detection I am seeing my worker threads are leaking memory when publishing messages to kafka. (I noticed this to increase when my Kafka broker applies backpressure.) Here's the code snippet of interest.

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof HttpRequest) {
        HttpRequest req = (HttpRequest) msg;
        FullHttpRequest fReq = (FullHttpRequest) req;
        Charset utf8 = CharsetUtil.UTF_8;
        ByteBuf buf = fReq.content();
        String in = buf.toString(utf8);
        buf.clear();

        postToKafka.write2Kafka(in);

        if (HttpHeaders.is100ContinueExpected(req)) {
            ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
        }
        boolean keepAlive = HttpHeaders.isKeepAlive(req);
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
                Unpooled.wrappedBuffer(RESP.getBytes()));
        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
        response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
        response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
        if (!keepAlive) {
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        } else {
            response.headers().set(CONNECTION, Values.KEEP_ALIVE);
            ctx.writeAndFlush(response);
        }
        LOG.debug(response.toString());
    }
}


Exception stack trace. 

[nioEventLoopGroup-3-2] DEBUG com.netty.httpserver.HttpInputHandler - DefaultFullHttpResponse(decodeResult: success, version: HTTP/1.1, content: UnpooledHeapByteBuf(freed))
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 1024 byte(s) of direct memory (used: 253426695, max: 253427712)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:523)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:477)
        at io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.allocateDirect(UnpooledUnsafeNoCleanerDirectByteBuf.java:30)
        at io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:67)
        at io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.<init>(UnpooledUnsafeNoCleanerDirectByteBuf.java:25)
        at io.netty.buffer.UnsafeByteBufUtil.newUnsafeDirectByteBuf(UnsafeByteBufUtil.java:425)
        at io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:65)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
        at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
        at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:748)

This Error occurs on all Threads: #1,#2,#3,#4,#5. Finally there's this message:

[nioEventLoopGroup-3-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
WARNING: 3 leak records were discarded because the leak record count is limited to 4. Use system property io.netty.leakDetection.maxRecords to increase the limit.
Recent access records: 5
#5:
        io.netty.buffer.AdvancedLeakAwareByteBuf.getBytes(AdvancedLeakAwareByteBuf.java:220)
        io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:815)
        io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:43)
        io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:227)
        io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:868)
        io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:574)
        io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:979)
        io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:974)
        com.netty.httpserver.HttpInputHandler.channelRead(HttpInputHandler.java:45)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
        io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
        io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        java.lang.Thread.run(Thread.java:748)


Created at:
        io.netty.util.ResourceLeakDetector.track(ResourceLeakDetector.java:229)
        io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:69)
        io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
        io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
        io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
        io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
        io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        java.lang.Thread.run(Thread.java:748)

Norman Maurer

unread,
Jul 9, 2017, 4:47:52 AM7/9/17
to ne...@googlegroups.com
You need to release the msg that is passed into channelRead(...)
--
You received this message because you are subscribed to the Google Groups "Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email to netty+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/netty/718c797b-dead-4d90-908f-dafca72af183%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

dhawan.g...@datavisor.com

unread,
Jul 11, 2017, 10:59:50 PM7/11/17
to Netty discussions
@Norman Maurer That fixed the problem.

Please find updated code below:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpRequest) {
            HttpRequest req = (HttpRequest) msg;
            FullHttpRequest fReq = (FullHttpRequest) req;
            Charset utf8 = CharsetUtil.UTF_8;
            ByteBuf buf = fReq.content();
            try {
                String in = buf.toString(utf8);
                buf.clear();
                ...
            } finally {
                buf.release();
            }
        }
    }
}
Reply all
Reply to author
Forward
0 new messages