How to increase the number of bytes a file receiver receives from a server?

36 views
Skip to first unread message

L_DisplayName

unread,
Apr 25, 2017, 11:14:14 PM4/25/17
to Netty discussions
Greetings All,
  I implementing Netty code to transfer a single 5GB binary file from one host (Node 0) to another host (Node 1), it does it successfully, however the receiver only reads in 64KB at a time, but the sender should be sending 100MB chunks at a time, via the ChunkedWriteHandler which is specified to fetch and send 100MB of data. The bandwidth of the link between Node0 and Node1 is 10Gb, how can I increase the number of bytes the receiver receives at a time?  For more info, please see the code below, specifically the FileReceiverHandler class 

Thank You for your assistance,

Code Set up:
  1. FileSenderInitializer.java
  2. FileSenderHandler.java - 
  3. FileSender.java     
  4. FileReceiverInitializer.java
  5. FileReceiverHandler.java  (Note this class extends SimpleChannelInboundHandler<ByteBuf>)
  6. FileReceiver.java
FileSenderInitializer.java - Initializes the channel pipeline with the channel handlers
public class FileSenderInitializer extends ChannelInitializer<SocketChannel> {

 
@Override
 
public void initChannel(SocketChannel ch) throws Exception {
 ch
.pipeline().addLast(
 
 //new LengthFieldPrepender(8),
 new ChunkedWriteHandler(),
 new FileSenderHandler());
 }
}

FileSenderHandler.java - Sends the file header info - File Name, offset, length and then the Actual File
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
 try {
 String fileRequest = "ftp Node0/root/10MB_File.dat Node1/tmp/10MB_File_Copy.dat";

 //Source File to send / transfer to the Destination Node
 String theSrcFilePath =  "/root/10MB_File.dat";

 //File Name to write on the destination node, once the file is received  
 
String theDestFilePath = "/tmp/10MB_File_Copy.dat";

//Get the source file to send
 
File theFile = new File(theSrcFilePath);
 
FileChannel theFileChannel = new RandomAccessFile(theFile, "r").getChannel();

//Get the length of the file
 
long fileLength = theFileChannel.size();
 //Get the offset
 long offSet = 0;
 
 //Copy the offset to the ByteBuf
 ByteBuf offSetBuf = Unpooled.copyLong(offSet);
 //Copy the file length to the ByteBuf
 ByteBuf fileLengthBuf = Unpooled.copyLong(fileLength);

 //Get the Destination Filename (including the file path) in Bytes
 
byte[] theDestFilePathInBytes = theDestFilePath.getBytes();
 
//Get the length of theFilePath
 
int theDestSize = theDestFilePathInBytes.length;
 
//Copy the Dest File Path length to the ByteBuf
 
ByteBuf theDestSizeBuf = Unpooled.copyInt(theDestSize);
 
//Copy the theDestFilePathInBytes to the Byte Buf
 
ByteBuf theDestFileBuf = Unpooled.copiedBuffer(theDestFilePathInBytes);
 
 //Send the file Headers: FileName Length, the FileName, the Offset and the file length
 ctx.write(theDestSizeBuf);
 ctx.write(theDestFileBuf);
 ctx.write(offSetBuf);
 ctx.write(fileLengthBuf);
 ctx.flush();

 //Send the 5GB File in 100MB chunks as specified by the following chunk size (1024*1024*100)
 ctx.write(new ChunkedNioFile(theFileChannel, offSet, fileLength, 1024 * 1024 * 100)); //Send File in 100MB Chunks
 ctx.flush();
 
 }catch(Exception e){
 System.err.printf("FileSenderHandler: Channel Active: Error: "+e.getMessage());
 e.printStackTrace();
 }
} //End channelActive





FileSender.java - Bootstraps the channel and connects this client/host to another host
public static void main(String[] args) throws Exception {
 // Configure the client/ File Sender
 EventLoopGroup group = new NioEventLoopGroup();
 try {
 Bootstrap b = new Bootstrap();
 b.group(group)
 .channel(NioSocketChannel.class)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new FileSenderInitializer());

 // Start the client.
 ChannelFuture f = b.connect(HOST, PORT).sync();

 // Wait until the connection is closed.
 //f.channel().closeFuture().sync();
 } finally {
 // Shut down the event loop to terminate all threads.
 group.shutdownGracefully();
 }
 }
}

FileReceiverInitializer.java - Initializes the channel pipeline with the channel handlers
public class FileReceiverInitializer extends ChannelInitializer<SocketChannel> {

 public FileReceiverInitializer(){
 
 }

 @Override
 public void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast(
//Read in 1MB data at a time (which is the max frame length), length field offset starts at 0, length of the length field is 8 bits, length adjustment is 0, strip the 8 bits representing the length field from the frame
 
//new LengthFieldBasedFrameDecoder(1024*1024*1, 0, 8, 0, 8),
 new FileReceiverHandler());
 }
}

FileReceiverHandler.java - Receives the file header info - File Name, offset, length and then the actual file
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
while (msg.readableBytes() >= 1){
logger.info("Number of readable Bytes = " + msg.readableBytes()); //Readable Bytes only = 64KB, I want to increase this to 100MB
  //Read in the size of the File Name and it's directory path
 
if (!fileNameStringSizeSet) {
  fileNameStringSizeBuf
.writeBytes(msg, ((fileNameStringSizeBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringSizeBuf.writableBytes())); //INT_SIZE = 4 & LONG_SIZE = 8 (the byte size of an int and long)
 
if (fileNameStringSizeBuf.readableBytes() >= INT_SIZE) {
  fileNameStringSize
= fileNameStringSizeBuf.getInt(fileNameStringSizeBuf.readerIndex());//Get Size at index = 0;
  fileNameStringSizeSet
= true;
 
//Allocate a byteBuf to read in the actual file name and it's directory path
  fileNameStringBuf
= ctx.alloc().buffer(fileNameStringSize);
 
}
 
} else if (!readInFileNameString) {
 
//Read in the actual file name and it's corresponding directory path
  fileNameStringBuf
.writeBytes(msg, ((fileNameStringBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringBuf.writableBytes()));
 
if (fileNameStringBuf.readableBytes() >= fileNameStringSize) {
  readInFileNameString
= true;
 
//convert the data in the fileNameStringBuf to an ascii string
  thefileName
= fileNameStringBuf.toString(Charset.forName("US-ASCII"));
 
 
//Create file
  emptyFile
= new File(thefileName); //file Name includes the directory path
  f
= new RandomAccessFile(emptyFile, "rw");
  fc
= f.getChannel();
 
}
 
}else if (!readInOffset) {
  offSetBuf
.writeBytes(msg, ((offSetBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : offSetBuf.writableBytes()));
 
if (offSetBuf.readableBytes() >= LONG_SIZE) {
  currentOffset
= offSetBuf.getLong(offSetBuf.readerIndex());//Get Size at index = 0;
  readInOffset
= true;
 
}
 
} else if (!readInFileLength) {
  fileLengthBuf
.writeBytes(msg, ((fileLengthBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileLengthBuf.writableBytes()));
 
//LONG_SIZE = 8
 
if (fileLengthBuf.readableBytes() >= LONG_SIZE) {
  fileLength
= fileLengthBuf.getLong(fileLengthBuf.readerIndex());//Get Size at index = 0;
  remainingFileLength
= fileLength;
  readInFragmentLength
= true;
 
}
 
} else {
 
if (!readInCompleteFile) {
 
if (msg.readableBytes() < remainingFileLength) {
 
if (msg.readableBytes() > 0) {
currentFileBytesWrote = 0
while ( msg.readableBytes >= 1 ){
  int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), msg.readableBytes()), currentOffset);
  currentOffset
+= fileBytesWrote;
  remainingFileLength
-= fileBytesWrote;
msg.readerIndex(msg.readerIndex + fileBytesWrote);
}
  }
 
} else {
 
int remainingFileLengthInt = (int) remainingFileLength;
while (remainingFileLength >= 1){
  int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), remainingFileLengthInt), currentOffset);

 currentOffset += fileBytesWrote ;
  remainingFileLength -= fileBytesWrote;
remainingFileLengthInt-= fileBytesWrote;
msg.readerIndex(msg.readerIndex + fileBytesWrote );
}
 
 
//Set readInCompleteFile to true
  readInCompleteFile
= true;
 
 
}
 
}//End else if file chunk
 
}//End Else
 
}//End While
}//End Read Method


FileReceiver.java - Bootstraps the Server and accepts connections
public static void main(String[] args) throws Exception {
 // Configure the server
 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 ServerBootstrap b = new ServerBootstrap();
 b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new FileReceiverInitializer())
 .childOption(ChannelOption.AUTO_READ, true)
 .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
 } finally {
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
}

Tim Boudreau

unread,
Jun 18, 2017, 1:57:29 AM6/18/17
to Netty discussions
The code above seems very elaborate, given what you're trying to do - elaborate enough that mentally debugging it isn't something I'm keen to do.  There could be a parameter somewhere that you're missing.

BUT...generally, when you're sending data over a network, there are a LOT of things that can affect how things get broken up and reassembled, and there is NO guarantee that just because the data was in 100Mb chunks at one of the end of the wire, that it will arrive that way - the nature of TCP/IP is that things can get carved up into smaller chunks by anything between one end of the wire and the other, as long as the other end can reassemble the result.  64k could be a buffer size somewhere in your OS, or on any piece of hardware between one end of the wire and the other (you definitely don't have a 100Mb MTU on your network, so in reality "100Mb chunk" was always an illusion - it's most likely going to get split up into ~1500 byte packets before it leaves your machine).  How you see it arrive does not necessarily have anything to do with how was carved up when it departed.  So it's entirely possible that everything is already working fine.

-Tim

Reply all
Reply to author
Forward
0 new messages