How to limit the size of outbound buffer in 4.0.6

773 views
Skip to first unread message

Alexey Danilov

unread,
Aug 5, 2013, 12:32:03 PM8/5/13
to ne...@googlegroups.com
How do you normally limit the combined size of buffers currently waiting to be sent in outbound buffer queue?
I have tried to play with ctx.channel().config().setWriteBufferLowWaterMark() and, correspondingly, HighWaterMark() but not sure this produces the result I'm interested in. Let me explain...

I have a service that obtains data from a large file in form of ByteBuffers. It then sends data to simple Encoder as fast as it can obtain them. My Encoder extends MessageToByteEncoder<Object> and what is basically does is just writes them over the network like so:

Encoder extends MessageToByteEncoder<Object>:

@Override
    public void encode(ChannelHandlerContext ctx, Object object, ByteBuf out) throws Exception {
...
            ByteBuffer binaryDataBuffer = (ByteBuffer) object;
            out.writeBytes(Unpooled.wrappedBuffer(binaryDataBuffer));

I can recall that in one of netty 4 release candidates there was an option to explicitly set inbound and outbound buffer size. In final version, inbound buffer size can be regulated by specifying         ctx.channel().config().setRecvByteBufAllocator(recvByteBufAllocator), but what is the corresponding option to control the combined size of buffers queued to be sent?


jacobea...@gmail.com

unread,
Aug 5, 2013, 2:30:25 PM8/5/13
to ne...@googlegroups.com
To clarify a bit what I'm trying to achieve here...

Suppose when in the service which reads from a file, fills a buffer and then instructs netty to send it over the network I write something like:

while (<ther are buffers available>) {
<get byteBuffer>
ChannelFuture channelFuture = ctx.channel().writeAndFlush(byteBuffer);
channelFuture.await();}

This will mean that untill the current buffer is not physically sent over the network, netty will not try to send anything else. It work nice and clean, sending 3Gb file without any problems.
However, suppose I want to obtain a bunch of buffers at once, and schedule them for sending. How do I specify how much data netty can accumulate before it can accept no more and block?
Right now if I try to send a big file like this:

while (<ther are buffers available>) {
<get byteBuffer>
ctx.channel().writeAndFlush(byteBuffer); } // do not wait for the current buffer to be physically written

I quickly get OutOfMemory error and direct buffers occupy all the available memory.

Btw, all my inbound handlers extend SimpleChannelInboundHandler, and release byteBufs automatically. Also, I do not see any leaks reported by ResourceLeakDetector.

Norman Maurer

unread,
Aug 6, 2013, 4:19:52 AM8/6/13
to ne...@googlegroups.com
You should be able to use:

while(channel.isWritable()) {
}

You can adjust the watermarks for the limits via the ChannelConfig or ChannelOption.

---
Norman Maurer

JBoss, by Red Hat



--
 
---
You received this message because you are subscribed to the Google Groups "Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email to netty+un...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Alexey Danilov

unread,
Aug 7, 2013, 2:46:54 AM8/7/13
to ne...@googlegroups.com, nma...@redhat.com
Thank you, Norman! This is what i've been looking for.

Now I'm wondering, is there any straightforward way to await for a channel to be writeable again? In a similar fashion that i can wait for the write future to be completed.

So far i've been using combination of channelWriteabilityChanged() and Java's concurrency package locks, but seeing as netty provides many convenience methods, there's got to be somethind more elegant.

"이희승 (Trustin Lee)"

unread,
Aug 7, 2013, 3:06:04 AM8/7/13
to ne...@googlegroups.com
The best way is to implement some sort of state machine using the ChannelFuture.  For example:

ChannelFuture generateTraffic() {
  ChannelFuture f;
  do {
    f = ch.write(msg);
  while (ch.isWritable());
  return f;
}

generateTraffic().addListener((f) -> { if (f.isSuccess()) generateTraffic(); });

Alexey Danilov

unread,
Aug 8, 2013, 12:32:37 AM8/8/13
to ne...@googlegroups.com
Thanks for your suggestion, Trustin!

If I undestand it correctly, with the code you provided once I send a piece of data that overflows highwatermark, netty won't put anything into its outbound queue until the same piece of data is sent (future is successful). That means I would have to wait not only for this particular piece of information, but anything already in the queue to be sent...

In a scenario when highwatermark is, say, 200mb and low watermark is 150, and once I write a buffer overflowing it, to 210, then before proceeding with sending data outbound queue would have to be depleted (because last "overflowing" buffer arrived last and has to be sent last, after all the buffers already in the queue).

On the other hand, if I block when channel becomes unwriteable and unblock once I receive channelWritabilityChanged message, that would allow me to continue before depleting outbound queue and more or less stay withing high- and low- watermarks levels.

But maybe I just got it wrong.
Reply all
Reply to author
Forward
0 new messages