org.cleversafe.protocol.impl.GridProtocolHeaderCodec
I pasted it here. So do you think this is because I am wrapping it in data stream?
/** * Constructor. Takes the byte array and deserializes it into a Header object. * * @param header * The header byte array. * @throws HeaderFormatException */ public GridProtocolHeaderCodec(final byte[] header) throws HeaderFormatException { this(new DataInputStream(new ByteArrayInputStream(header))); } /** * Constructor. Takes the byte array and deserializes it into a Header object. * * @param header * The header byte array. * @throws HeaderFormatException */
public GridProtocolHeaderCodec(final DataInput in) throws HeaderFormatException {
// Read from input final int protocolClass; final int protocolVersion; try { protocolClass = in.readUnsignedByte(); protocolVersion = in.readUnsignedByte(); this.operationCode = (byte) in.readUnsignedByte(); this.isResponse = (in.readUnsignedByte() & 0x80) != 0; this.sequenceNumber = in.readInt(); this.payloadLength = in.readInt(); // Todo: make it configurable, see DE2735 if (this.payloadLength > MAX_PAYLOAD_SIZE) { final String message = "Payload size exceeds max allowed " + this.payloadLength + " > " + MAX_PAYLOAD_SIZE; dumpHeader(protocolClass, protocolVersion); throw new HeaderFormatException(message); } } catch (final EOFException eof) { throw new HeaderFormatException("Incomplete header", eof); } catch (final IOException ioe) { throw new HeaderFormatException( "Header bytes passed in constructor are of invalid format!", ioe); } }
Here it is.
public class GridProtocolHeaderCodec { private static final Logger _logger = LoggerFactory.getLogger(GridProtocolHeaderCodec.class); private int sequenceNumber; private byte operationCode; private boolean isResponse; private int payloadLength; public static final int LENGTH = 12; public static final byte PROTOCOL_CLASS = 1; public static final byte PROTOCOL_VERSION = 1; private static final int MAX_PAYLOAD_SIZE = (int) (100 * Size.MiB); /** * Constructor. Takes the deserialized integer values and creates the Header object. * * @param sequenceNumber * @param operationCode * @param payloadLength */ public GridProtocolHeaderCodec( final int sequenceNumber, final byte operationCode, final boolean isResponse, final int payloadLength) { this.sequenceNumber = sequenceNumber; this.operationCode = operationCode; this.payloadLength = payloadLength; this.isResponse = isResponse; } /** * Constructor. Takes the byte array and deserializes it into a Header object. * * @param header * The header byte array. * @throws HeaderFormatException */ public GridProtocolHeaderCodec(final byte[] header) throws HeaderFormatException { this(new DataInputStream(new ByteArrayInputStream(header))); } public static GridProtocolHeaderCodec create(final ByteBuffer header) throws HeaderFormatException { final byte[] bytes = new byte[LENGTH]; header.get(bytes); return new GridProtocolHeaderCodec(new DataInputStream(new ByteArrayInputStream(bytes))); } /** * Constructor. Takes the byte array and deserializes it into a Header object. * * @param header * The header byte array. * @throws HeaderFormatException */ public GridProtocolHeaderCodec(final DataInput in) throws HeaderFormatException { // Read from input final int protocolClass; final int protocolVersion; try { protocolClass = in.readUnsignedByte(); protocolVersion = in.readUnsignedByte(); this.operationCode = (byte) in.readUnsignedByte(); this.isResponse = (in.readUnsignedByte() & 0x80) != 0; this.sequenceNumber = in.readInt(); this.payloadLength = in.readInt(); // Todo: make it configurable, see DE2735 if (this.payloadLength > MAX_PAYLOAD_SIZE) { final String message = "Payload size exceeds max allowed " + this.payloadLength + " > " + MAX_PAYLOAD_SIZE; dumpHeader(protocolClass, protocolVersion); throw new HeaderFormatException(message); } } catch (final EOFException eof) { throw new HeaderFormatException("Incomplete header", eof); } catch (final IOException ioe) { throw new HeaderFormatException( "Header bytes passed in constructor are of invalid format!", ioe); } } private void dumpHeader(final int protocolClass, final int protocolVersion) { final StringBuffer header = new StringBuffer("Invalid packet - Packet dump"); header.append("\nclass: ").append(protocolClass); header.append("\nopCode: ").append(this.operationCode); header.append("\nisResponse: ").append(this.isResponse); header.append("\nsequence: ").append(this.sequenceNumber); header.append("\npayloadLen: ").append(this.payloadLength); _logger.warn(header.toString()); } public void serialize(final ByteBuffer buffer) { write(buffer); } /** * Writes the header to the speicified output stream */ public void write(final ByteBuffer byteBuffer) { _logger.trace("Serializing a Header object into a header byte array."); byteBuffer.put(PROTOCOL_CLASS); byteBuffer.put(PROTOCOL_VERSION); byteBuffer.put(this.operationCode); byteBuffer.put(this.isResponse ? (byte) 0x80 : (byte) 0x00); byteBuffer.putInt(this.sequenceNumber); byteBuffer.putInt(this.payloadLength); } /** * Returns the sequence number. * * @return The sequence number. */ public int getSequenceNumber() { return this.sequenceNumber; } /** * Returns the payload length. * * @return The payload length. */ public int getPayloadLength() { return this.payloadLength; } /** * Returns the protocol operation code. * * @return The protocol operation code. */ public int getOperationCode() { return this.operationCode; } @Override public String toString() { return "Protocol Header [ID=" + this.sequenceNumber + " C=" + this.operationCode + " L=" + this.payloadLength + "]"; } public boolean isResponse() { return this.isResponse; } }
To me, it just looks like your codec made an assumption that your buffer has a complete message which is not always true. Can you confirm?