vertx tcpclient variable size

517 views
Skip to first unread message

Sharat Koya

unread,
Apr 22, 2016, 1:22:14 PM4/22/16
to vert.x
Hi,

I am connecting to a server using NetSocket in a verticle. Each time the server emits a message the second set of two bytes denote how long the message will be. 

<version 2bytes><length 2 bytes><correlationToken 4 bytes><xml message payload>

What is the vertx way of correctly consuming these messages as I find the handler for some reason doesn't correctly break them up?

thanks,
SK

Mumuney Abdlquadri

unread,
Apr 22, 2016, 1:48:15 PM4/22/16
to ve...@googlegroups.com

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/b85da314-b5de-411d-bd32-fa717f245dec%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Sharat Koya

unread,
Apr 22, 2016, 3:08:21 PM4/22/16
to vert.x
So how do you do variable size with no delimiter?

Alexander Lehmann

unread,
Apr 22, 2016, 3:15:13 PM4/22/16
to vert.x
The record parser supports switching between the different modes or between different record sizes when reading the records, for length prefixed records, the first record would be fixed 8 bytes and then next one the size determined from the record just read, the next record is then again set to 8 and so on

Sharat Koya

unread,
Apr 23, 2016, 3:43:29 PM4/23/16
to vert.x
Sorry I think I am missing the point here... If I use the record parser in the context below how do I get it to pick up the variable length messages? I.e. in the client socket near the bottom of this test if I use the record parser once to get the first 8 bytes and pick out the length, how do I use it on the next set of data using the previous value extracted? (see line with RecordParser).

thanks for any help on this.

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;

import org.junit.runner.RunWith;
import org.junit.*;

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

@RunWith(VertxUnitRunner.class)
public class RecordParserTest {
int PORT = 50000;
Vertx serverVertx;
static Random random = new Random();

private static Buffer newRadomSizeMessage() {
StringBuffer message = new StringBuffer();

for (int i=0; i <= random.nextInt(16384); i++) {
message.append(i);
}
System.out.println("Generated Length: " + message.toString().getBytes().length);

return Buffer.buffer()
.appendShort((short) 0)
.appendShort((short)message.toString().getBytes().length)
.appendBytes("0000".getBytes())
.appendBytes(message.toString().getBytes());

}

@Before
public void setupServer() {
AtomicInteger messageCount = new AtomicInteger(0);
Vertx serverVertx = Vertx.vertx();
NetServer server = serverVertx
.createNetServer()
.connectHandler(socket -> {
socket.handler(buffer -> {
//incoming data to the server
});
socket.write(newRadomSizeMessage());
System.out.println("Sent");
})
.listen(PORT, "localhost");

serverVertx.setPeriodic(10000, keepAlive -> {

});

}

@Test
public void recordParserCheck(TestContext testContext) {
Async async = testContext.async();
Vertx clientVertx = Vertx.vertx();
NetClient client = clientVertx.createNetClient();
client.connect(PORT, "localhost", result -> {

if (result.succeeded()) {
NetSocket socket = result.result();
socket.handler(RecordParser.newFixed(8, parsed -> {
System.out.println(parsed.toString());
}));

clientVertx.setPeriodic(10000, keepAlive -> {});
} else {
System.out.println(result.cause());
}
});
}
}

Alexander Lehmann

unread,
Apr 24, 2016, 5:56:02 AM4/24/16
to vert.x
You can set the block size of the record parser when it is already processing the data, for this you have to keep the RecordParser instance in a variable and change the parameter for each record, something like this (please note that this is not actual code, i just typed that in google groups, you probably need some additional stuff to get the sizeChunk working)

RecordParser rp = RecordParser.newFixed(8);
boolean sizeChunk = true;
handler
( block-> {
 
if(sizeChunk) {
   
int size = calculateSize(block);
    rp
.fixedSizeMode(size);
    sizeChunk
= false;
 
} else {
   
System.out.println("block:"+block);
    rp
.fixedSizeMode(8);
    sizeChunk
= false;
 
}
});

Tim Fox

unread,
Apr 24, 2016, 7:53:30 AM4/24/16
to ve...@googlegroups.com
There is no concept of "message" in TCP, there's just a stream of bytes. Your handler is simply passed what has been read from the wire. This can be in any number of chunks.

Sharat Koya

unread,
Apr 25, 2016, 1:22:43 PM4/25/16
to vert.x
Understood, thanks. I managed to implement a custom RecordParser that takes a hint of where the header is using a delimiter at the beginning of the message, track back to the location of the header in the buffer, extract the message size and then spit out messages or wait for the next appended chunk until a full message arrives.

For ref I used the vertx RecordParser code to hint on how to do it in case anyone comes across the same issue.

Tim Fox

unread,
Apr 25, 2016, 2:15:20 PM4/25/16
to ve...@googlegroups.com
You shouldn't need a delimiter as your protocol is a very simple fixed
length one.

Just used a fixed size and set initial record length to 2.

The first buffer you receive will is your version, you can ignore it.

The second 2 byte buffer you receive will be your length. Read that and
remember the value.

Now set fixed length to 4.

The next 4 byte record you receive will be your correlation token.

Now set fixed length to the length you remembered from before.

The next record you receive will be your xml.

Rinse and repeat.

Sharat Koya

unread,
May 5, 2016, 3:48:02 PM5/5/16
to vert.x
You are absolutely right. I re-implemented using the standard record parser, an enum to define the different parts of the message and then used a switch statement to ensure the record parser used the correct next size when it was on each part of the message. 

thanks!
Reply all
Reply to author
Forward
0 new messages