Deserialization failing randomly

724 views
Skip to first unread message

Manoj

unread,
Apr 17, 2012, 7:12:15 PM4/17/12
to kryo-users
First of all let me say that this project rocks and so a big thanks to
Nate!

I already created a defect (#57) for this but then I also realized
that this may not be an issue with kryo but may be a problem of me not
understanding the API or not using the API properly and so thought to
post my question here. I am actually writing a Netty based client/
server app and using kryo for serialization. The core of the
communication involves serializing and a deserializing a Message
object which contain the request or the response.

I wrote a MessageConverter class which uses Kryo for the message
serialization/deserialization. Both the client and the server use the
same version of the MessageConverter class. The problem I am running
into is the deserialization fails randomly with the following
exception:

com.esotericsoftware.kryo.KryoException: Encountered unregistered
class ID: 48
Serialization trace:
command (remoting.Message)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:517)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:
208)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:537)

As described above I created a MessageConverted class which is capable
of serializing/deserializing a Message object and I have been careful
to use kryo.setRegistrationRequired(true) so that I can explicitly
catch any classes that are not registered. This did help me catch
classes that were not registered like Arrays.ArrayList. I am using the
same converter class on both ends of a client server application. The
problem I am seeing is that the deserialization often fails with the
following message:

com.esotericsoftware.kryo.KryoException: Encountered unregistered
class ID: 48
Serialization trace:
command (remoting.Message)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:517)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:
208)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:537)

Since I am using the same converter class on both the ends and have
explicitly set class registration as required, I am not sure why the
serialization succeeds and deserialization fails with the above
exception. Furthermore both the server and the client is brand new
code that I am writing and so there is no question of different
versions of the classes running on each end.

Here is the converter class that I am using:

import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.ArraySerializer;
import
com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;

import data.user.User;
import hibernate.LongId;
import model.stockdata.WriteProtectedTicker;
import model.trade.QuoteType;
import model.trade.stock.DirectionSymbol;
import model.trade.stock.Exchange;
import model.trade.stock.Quote;
import model.trade.stock.Quote.HaltIndicatorCode;
import model.trade.stock.Quote.HaltStatusCode;
import model.trade.stock.Quote.QuoteSource;
import model.trade.stock.Ticker;
import remoting.Message;
import remoting.MessageContext;
import remoting.MessageConverter;

/**
* This class is used to provide extremely compact representation of
messages when sent over the wire.
* <p>
*
* The class internally uses the Kryo java serialization framework
(http://code.google.com/p/kryo/).
* <p>
*
* @author me
*/
public class DefaultMessageConverter implements MessageConverter {
/**
* An internal thread local is used to cache the created Kryo
instances at a
* thread local level (Kryo is not thread safe) ...
*/
private ThreadLocal<Kryo> converters = new ThreadLocal<Kryo>();

/**
* This method is used to convert a message into bytes.
* <p>
*
* @param message
* instance of Message that needs to be converted
* @return a byte array representing the conversion
* @throws NullPointerException
* if passed in message is null
*/
public byte[] toBytes(Message message) {
Kryo converter = getConverter();

ByteArrayOutputStream bos = new ByteArrayOutputStream();
Output output = new Output(bos);

converter.writeObject(output, message);

return output.toBytes();
}

/**
* This method reconstructs the Message from the given bytes.
* <p>
*
* @param encoded
* a byte array representing the serialized message
* @return the reconstructed Message
* @throws NullPointerException
* if passed in byte array is null
*/
public Message fromBytes(byte[] encoded) {
Kryo converter = getConverter();
Input input = new Input(encoded);

return converter.readObject(input, Message.class);
}

/**
* This method is used to get the instance of the kryo converter
associated with
* the running thread.
* <p>
*
* @return a new Kryo instance
*/
private Kryo getConverter() {
Kryo kryo = converters.get();
if (kryo == null) {
// PLEASE DO NOT CHANGE THE SEQUENCE OF ANY OF THE CLASSES
// WHILE ADDING NEW CLASSES, ADD AT THE BOTTOM ...

kryo = new Kryo();
kryo.setRegistrationRequired(true);

kryo.register(Class.class);
kryo.register(HashMap.class);
kryo.register(ArrayList.class);
kryo.register(BigDecimal.class);
kryo.register(Date.class);

// This one is required for the special cases where we are
using Arrays.asList() functionality ...
kryo.register(Arrays.asList("").getClass(), new
ArrayAsListSerializer());

kryo.register(Ticker.class, new TickerSerializer());
kryo.register(Commands.class, new CommandsSerializer());
kryo.register(LongId.class);
kryo.register(User.class);
kryo.register(Message.class);
kryo.register(MessageContext.class);
kryo.register(Quote.class, new
CompatibleFieldSerializer(kryo, Quote.class));
kryo.register(DirectionSymbol.class);
kryo.register(HaltIndicatorCode.class);
kryo.register(HaltStatusCode.class);
kryo.register(QuoteSource.class);
kryo.register(QuoteType.class);
kryo.register(WriteProtectedTicker.class, new
TickerSerializer());

// Cache the kryo instance ...
converters.set(kryo);
}

return kryo;
}

/**
* This serializer is used to represent a compact representation
of a command when sent over the wire.
* <p>
*/
private static final class CommandsSerializer implements
Serializer<Commands> {
public Commands read(Kryo kryo, Input input, Class<Commands>
tickerClass) {
return
Commands.GET_REALTIME_QUOTE.fromCode(input.readByte());
}

public void write(Kryo kryo, Output output, Commands command)
{
output.writeByte(command.getCode());
}
}

/**
* This serializer is used to represent a compact representation
of a ticker when sent over the wire.
* <p>
*/
private static final class TickerSerializer implements
Serializer<Ticker> {

public Ticker read(Kryo kryo, Input input, Class<Ticker>
tickerClass) {
String cusip = kryo.readObject(input, String.class);
String symbol = kryo.readObject(input, String.class);
String exchangeName = kryo.readObject(input,
String.class);

Ticker ticker = new Ticker();
ticker.setCusip(cusip);
ticker.setSymbol(symbol);

ticker.setExchange(Exchange.NYSE.fromMappedValue(exchangeName));

return ticker;
}

public void write(Kryo kryo, Output output, Ticker ticker) {
String symbol = ticker.getSymbol();
String cusip = ticker.getCusip();
String exchange = ticker.getExchange().toMappedValue();

kryo.writeObject(output, cusip);
kryo.writeObject(output, symbol);
kryo.writeObject(output, exchange);
}
}

/**
* This serializer is used for transforming lists built using the
Arrays.asList() function.
* <p>
*/
private static final class ArrayAsListSerializer extends
ArraySerializer {
private Field arrayField;

public ArrayAsListSerializer() {
try {
arrayField = Class.forName("java.util.Arrays
$ArrayList").getDeclaredField("a");
arrayField.setAccessible(true);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

public Object read(Kryo kryo, Input input, Class listClass) {
Object[] array = (Object[]) super.read(kryo, input,
Object[].class);
return Arrays.asList(array);
}

public void write(Kryo kryo, Output output, Object list) {
try {
final Object[] array = (Object[])
arrayField.get(list);
super.write(kryo, output, array);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

Nate

unread,
Apr 17, 2012, 7:24:59 PM4/17/12
to kryo-...@googlegroups.com
A quick note, it is more efficient to use output.writeString and input.readString (or writeString7/8 and readString7/8) than to use the Kryo instance to do the same.

I suggest updating to the latest Kryo. v2.05 was just released, or you can run from SVN. The ArrayAsListSerializer will need to be updated, I suggest using the one I posted on this mailing list that extends FieldSerailizer.

The problem you encountered, serialization succeeds but deserialization doesn't, can be caused by a bug in the serializers. If any serializer reads the wrong number of bytes, subsequent reads will read garbage and spectacular failure ensues.

If it still happens with the latest, I suggest writing a self contained test that reproduces the problem so it can be debugged.

-Nate

--
You received this message because you are subscribed to the "kryo-users" group.
http://groups.google.com/group/kryo-users

Reply all
Reply to author
Forward
0 new messages