Read/Write ByteBuffer in Chronicle queue

747 views
Skip to first unread message

gaz...@gmail.com

unread,
Jun 11, 2015, 6:29:22 AM6/11/15
to java-ch...@googlegroups.com
Hi all,

i encountered an issue when store object which include ByteBuffer when use appender.writeObject(), please see below code snippet:

package com.test.echo; 

import static org.junit.Assert.assertTrue; 

import java.io.IOException; 
import java.nio.ByteBuffer; 

import net.openhft.chronicle.Chronicle; 
import net.openhft.chronicle.ChronicleQueueBuilder; 
import net.openhft.chronicle.ExcerptAppender; 
import net.openhft.chronicle.ExcerptTailer; 
import net.openhft.chronicle.tools.ChronicleTools; 

import org.junit.Assert; 
import org.junit.Test; 

import com.test.echo.DoffDataMessage.DataMessageType; 

public class ByteBufferTest { 

        @Test 
        public void testByteBufferStore() throws IOException { 
                String basePath = "/tmp/client-outbound"; 
                Chronicle chronicle = ChronicleQueueBuilder.indexed(basePath).build(); 
                ChronicleTools.deleteOnExit(basePath); 
                ExcerptAppender appender = chronicle.createAppender(); 
                String DATA = "Hello,Chronicle"; 
                for (int i = 0; i < 10; i++) { 
                        appender.startExcerpt(); 
                        ByteBuffer buffer = ByteBuffer.allocate(1024 * 8); 
                        buffer.put(DATA.getBytes("UTF-8")); 
                        DoffDataMessage message = new DoffDataMessage(DataMessageType.PLAIN, buffer); 
                        appender.writeObject(message); 
                        appender.finish(); 
                } 

                ExcerptTailer tailer = chronicle.createTailer(); 
                for (int i = 0; i < 10; i++) { 
                        assertTrue(tailer.nextIndex() || tailer.nextIndex()); 

                        DoffDataMessage message = tailer.readObject(DoffDataMessage.class); 
                        ByteBuffer buffer = message.getData(); 
                        byte[] bytes = new byte[1024 * 8]; 
                        buffer.get(bytes); 
                        String s = new String(bytes, "UTF-8"); 
                        System.out.println("Data from queue:" + s); 
                        Assert.assertTrue(DATA.equals(s.trim())); 
                        tailer.finish(); 
                } 

        } 


package com.test.echo; 


import java.nio.ByteBuffer; 

import net.openhft.lang.io.Bytes; 
import net.openhft.lang.io.serialization.BytesMarshallable; 


/** 
 * DOFF Data structure segment. 
 * 
 * 
 */ 
public final class DoffDataMessage implements BytesMarshallable{ 
        /** 
         * Doff data message types. 
         */ 
        public static enum DataMessageType { 
                /** 
                 * Compressed. 
                 */ 
                COMPRESSED((byte) 'C'), 
                /** 
                 * Plain. 
                 */ 
                PLAIN((byte) 'D'), 
                /** 
                 * Heart beat. 
                 */ 
                HEARTBEAT((byte) 'H'); 

                private byte value; 

                /** 
                 * Constructor. 
                 * 
                 */ 
                private DataMessageType(byte value) { 
                        this.value = value; 
                } 

                /** 
                 * @return the value 
                 */ 
                public byte getValue() { 
                        return value; 
                } 
        } 

        private DataMessageType messateType; 

        private ByteBuffer data; 

        private long nanoTimestamp; 

        /** 
         * Constructor. 
         * 
         * @param messateType 
         *            Message type 
         * @param data 
         *            Data 
         */ 
        public DoffDataMessage(DataMessageType messateType, ByteBuffer data) { 
                this.messateType = messateType; 
                this.data = data; 
                nanoTimestamp = System.nanoTime(); 
        } 

        /** 
         * @return the nanoTimestamp 
         */ 
        public long getNanoTimestamp() { 
                return nanoTimestamp; 
        } 

        /** 
         * @return the messateType 
         */ 
        public DataMessageType getMessateType() { 
                return messateType; 
        } 

        /** 
         * @return the data 
         */ 
        public ByteBuffer getData() { 
                return data; 
        } 

        

        @Override 
        public void readMarshallable(Bytes in) throws IllegalStateException { 
                this.messateType = in.readEnum(DataMessageType.class); 
                this.data = ByteBuffer.allocate(1024*8); 
                in.read(this.data); 
                this.data.rewind(); 
                this.nanoTimestamp = in.readCompactLong(); 
                
        } 

        @Override 
        public void writeMarshallable(Bytes out) { 
                out.writeEnum(messateType); 
                data.flip(); 
                out.write(data); 
                out.writeCompactLong(nanoTimestamp); 
                
        } 

The testcase output was: "Data from queue:Hello Chronicled" which padding a 'd' in the last wrold.

Can you help to double check if my read/writeMarshallable for ByteBuffer is correct? many thanks.

Awesome for your unbelievable work!


Peter Lawrey

unread,
Jun 11, 2015, 8:25:14 AM6/11/15
to java-ch...@googlegroups.com
You need to write the length of the buffer you are writing to ensure you read the same amount. Otherwise it will read as much as is available which will include the nanoTimestamp.

        @Override
        public void readMarshallable(Bytes in) throws IllegalStateException {
                this.messateType = in.readEnum(DataMessageType.class); 
                int len = (int) in.readStopBit();
                // save the data off heap
                this.data = ByteBuffer.allocateDirect(len);
                in.read(this.data);
                this.data.flip(); // leave the data in readable state.

                this.nanoTimestamp = in.readCompactLong();                 
        }

        @Override
        public void writeMarshallable(Bytes out) {
                out.writeEnum(messateType);
                out.writeStopBit(data.remaining());
                out.write(data); 
                data.position(0); // leave the data in a readable state.
                out.writeCompactLong(nanoTimestamp);                 
        }


--
You received this message because you are subscribed to the Google Groups "Chronicle" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-chronicl...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

gaz...@gmail.com

unread,
Jun 11, 2015, 8:43:46 PM6/11/15
to java-ch...@googlegroups.com
Thanks Peter,

your suggestion make it works.

but the writeMarshallable method should add one more line to flip the data before wirte, as below:

@Override 
        public void writeMarshallable(Bytes out) { 
                out.writeEnum(messateType);
                data.flip();// both data.flip() and data.position(0) works here 
                out.writeStopBit(data.remaining());
                out.write(data); 
                data.position(0); // leave the data in a readable state.
                out.writeCompactLong(nanoTimestamp);                 
        }

and when reading the byte array length should be exactly the same length as ByteBuffer or it will throw BufferUnderflowException:

ByteBuffer buffer = message.getData()
byte[] bytes = new byte[buffer.remaining()];

Really appreciate you help, thanks again.


在 2015年6月11日星期四 UTC+8下午6:29:22,gaz...@gmail.com写道:

Peter Lawrey

unread,
Jun 12, 2015, 1:22:01 AM6/12/15
to java-ch...@googlegroups.com
Whether you need to flip it depends on what state you assume the ByteBuffer is in. I suggest you always set the ByteBuffer so it is about to be read. i.e. the position is 0 and the limit is the end of the data.  In this case you can write data you just read and you won't need to flip() here.

Not sure what you mean by the lengths.  When reading your buffer must be large enough but unless the message is corrupted this shouldn't be a problem as you are reading the length first.

Regards
   Peter.


gaz...@gmail.com

unread,
Jun 14, 2015, 8:40:21 PM6/14/15
to java-ch...@googlegroups.com
ByteBuffer get method will get length from 0 to length you take byte array (here say 1024), and your limit is less than 1024, it will throw BufferUnderflowException.

here is the java doc says:

public ByteBuffer get(byte[] dst,
             int offset,
             int length)
Relative bulk get method.

This method transfers bytes from this buffer into the given destination array. If there are fewer bytes remaining in the buffer than are required to satisfy the request, that is, if length > remaining(), then no bytes are transferred and a BufferUnderflowException is thrown.

Otherwise, this method copies length bytes from this buffer into the given array, starting at the current position of this buffer and at the given offset in the array. The position of this buffer is then incremented by length.

In other words, an invocation of this method of the form src.get(dst, off, len) has exactly the same effect as the loop

     for (int i = off; i < off + len; i++)
         dst[i] = src.get(); 
except that it first checks that there are sufficient bytes in this buffer and it is potentially much more efficient.

Parameters:
dst - The array into which bytes are to be written
offset - The offset within the array of the first byte to be written; must be non-negative and no larger than dst.length
length - The maximum number of bytes to be written to the given array; must be non-negative and no larger than dst.length - offset
Returns:
This buffer
Throws:
BufferUnderflowException - If there are fewer than length bytes remaining in this buffer
IndexOutOfBoundsException - If the preconditions on the offset and length parameters do not hold
 

Peter Lawrey

unread,
Jun 15, 2015, 2:46:03 AM6/15/15
to java-ch...@googlegroups.com

Thank you for letting me know you are looking at that. If you want to use byte[] instead of ByteBuffer I suggest you use byte [] as this will be more efficent than using an intermediate ByteBuffer.

Regards, Peter.

--

gaz...@gmail.com

unread,
Jun 15, 2015, 3:50:34 AM6/15/15
to java-ch...@googlegroups.com
Agree. :)

Thanks
Reply all
Reply to author
Forward
0 new messages