I have a very bare bones version of netty and kafka using netty4 and kafka-clients_0.10.1.1.
I am hitting memory leak issues when I load testing my server.
Here's how I run the Netty server.
java -Dio.netty.recycler.maxCapacity=0 -Dio.netty.leakDetectionLevel=advanced -Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG
-Dbootstrap.servers=localhost:9092 -Dtopic=test_topic -jar target/netty4-httpserver-0.0.1-jar-with-dependencies.jar com.netty.httpserver.HttpServerNetty
When I add the advanced leak detection I am seeing my worker threads are leaking memory when publishing messages to kafka. (I noticed this to increase when my Kafka broker applies backpressure.) Here's the code snippet of interest.
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
FullHttpRequest fReq = (FullHttpRequest) req;
Charset utf8 = CharsetUtil.UTF_8;
ByteBuf buf = fReq.content();
String in = buf.toString(utf8);
buf.clear();
postToKafka.write2Kafka(in);
if (HttpHeaders.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE));
}
boolean keepAlive = HttpHeaders.isKeepAlive(req);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.wrappedBuffer(RESP.getBytes()));
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
if (!keepAlive) {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} else {
response.headers().set(CONNECTION, Values.KEEP_ALIVE);
ctx.writeAndFlush(response);
}
LOG.debug(response.toString());
}
}
Exception stack trace.
[nioEventLoopGroup-3-2] DEBUG com.netty.httpserver.HttpInputHandler - DefaultFullHttpResponse(decodeResult: success, version: HTTP/1.1, content: UnpooledHeapByteBuf(freed))
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 1024 byte(s) of direct memory (used: 253426695, max: 253427712)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:523)
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:477)
at io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.allocateDirect(UnpooledUnsafeNoCleanerDirectByteBuf.java:30)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.<init>(UnpooledUnsafeDirectByteBuf.java:67)
at io.netty.buffer.UnpooledUnsafeNoCleanerDirectByteBuf.<init>(UnpooledUnsafeNoCleanerDirectByteBuf.java:25)
at io.netty.buffer.UnsafeByteBufUtil.newUnsafeDirectByteBuf(UnsafeByteBufUtil.java:425)
at io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:65)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
This Error occurs on all Threads: #1,#2,#3,#4,#5. Finally there's this message:
WARNING: 3 leak records were discarded because the leak record count is limited to 4. Use system property io.netty.leakDetection.maxRecords to increase the limit.
Recent access records: 5
#5:
io.netty.buffer.AdvancedLeakAwareByteBuf.getBytes(AdvancedLeakAwareByteBuf.java:220)
io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:815)
io.netty.buffer.CompositeByteBuf.getBytes(CompositeByteBuf.java:43)
io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:227)
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:868)
io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:574)
io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:979)
io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:974)
com.netty.httpserver.HttpInputHandler.channelRead(HttpInputHandler.java:45)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
java.lang.Thread.run(Thread.java:748)
Created at:
io.netty.util.ResourceLeakDetector.track(ResourceLeakDetector.java:229)
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:69)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168)
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129)
io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
java.lang.Thread.run(Thread.java:748)