First of all let me say that this project rocks and so a big thanks to
Nate!
I already created a defect (#57) for this but then I also realized
that this may not be an issue with kryo but may be a problem of me not
understanding the API or not using the API properly and so thought to
post my question here. I am actually writing a Netty based client/
server app and using kryo for serialization. The core of the
communication involves serializing and a deserializing a Message
object which contain the request or the response.
I wrote a MessageConverter class which uses Kryo for the message
serialization/deserialization. Both the client and the server use the
same version of the MessageConverter class. The problem I am running
into is the deserialization fails randomly with the following
exception:
com.esotericsoftware.kryo.KryoException: Encountered unregistered
class ID: 48
Serialization trace:
command (remoting.Message)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:517)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:
208)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:537)
As described above I created a MessageConverted class which is capable
of serializing/deserializing a Message object and I have been careful
to use kryo.setRegistrationRequired(true) so that I can explicitly
catch any classes that are not registered. This did help me catch
classes that were not registered like Arrays.ArrayList. I am using the
same converter class on both ends of a client server application. The
problem I am seeing is that the deserialization often fails with the
following message:
com.esotericsoftware.kryo.KryoException: Encountered unregistered
class ID: 48
Serialization trace:
command (remoting.Message)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:517)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:
208)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:537)
Since I am using the same converter class on both the ends and have
explicitly set class registration as required, I am not sure why the
serialization succeeds and deserialization fails with the above
exception. Furthermore both the server and the client is brand new
code that I am writing and so there is no question of different
versions of the classes running on each end.
Here is the converter class that I am using:
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.ArraySerializer;
import
com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
import data.user.User;
import hibernate.LongId;
import model.stockdata.WriteProtectedTicker;
import model.trade.QuoteType;
import model.trade.stock.DirectionSymbol;
import model.trade.stock.Exchange;
import model.trade.stock.Quote;
import model.trade.stock.Quote.HaltIndicatorCode;
import model.trade.stock.Quote.HaltStatusCode;
import model.trade.stock.Quote.QuoteSource;
import model.trade.stock.Ticker;
import remoting.Message;
import remoting.MessageContext;
import remoting.MessageConverter;
/**
* This class is used to provide extremely compact representation of
messages when sent over the wire.
* <p>
*
* The class internally uses the Kryo java serialization framework
(
http://code.google.com/p/kryo/).
* <p>
*
* @author me
*/
public class DefaultMessageConverter implements MessageConverter {
/**
* An internal thread local is used to cache the created Kryo
instances at a
* thread local level (Kryo is not thread safe) ...
*/
private ThreadLocal<Kryo> converters = new ThreadLocal<Kryo>();
/**
* This method is used to convert a message into bytes.
* <p>
*
* @param message
* instance of Message that needs to be converted
* @return a byte array representing the conversion
* @throws NullPointerException
* if passed in message is null
*/
public byte[] toBytes(Message message) {
Kryo converter = getConverter();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Output output = new Output(bos);
converter.writeObject(output, message);
return output.toBytes();
}
/**
* This method reconstructs the Message from the given bytes.
* <p>
*
* @param encoded
* a byte array representing the serialized message
* @return the reconstructed Message
* @throws NullPointerException
* if passed in byte array is null
*/
public Message fromBytes(byte[] encoded) {
Kryo converter = getConverter();
Input input = new Input(encoded);
return converter.readObject(input, Message.class);
}
/**
* This method is used to get the instance of the kryo converter
associated with
* the running thread.
* <p>
*
* @return a new Kryo instance
*/
private Kryo getConverter() {
Kryo kryo = converters.get();
if (kryo == null) {
// PLEASE DO NOT CHANGE THE SEQUENCE OF ANY OF THE CLASSES
// WHILE ADDING NEW CLASSES, ADD AT THE BOTTOM ...
kryo = new Kryo();
kryo.setRegistrationRequired(true);
kryo.register(Class.class);
kryo.register(HashMap.class);
kryo.register(ArrayList.class);
kryo.register(BigDecimal.class);
kryo.register(Date.class);
// This one is required for the special cases where we are
using Arrays.asList() functionality ...
kryo.register(Arrays.asList("").getClass(), new
ArrayAsListSerializer());
kryo.register(Ticker.class, new TickerSerializer());
kryo.register(Commands.class, new CommandsSerializer());
kryo.register(LongId.class);
kryo.register(User.class);
kryo.register(Message.class);
kryo.register(MessageContext.class);
kryo.register(Quote.class, new
CompatibleFieldSerializer(kryo, Quote.class));
kryo.register(DirectionSymbol.class);
kryo.register(HaltIndicatorCode.class);
kryo.register(HaltStatusCode.class);
kryo.register(QuoteSource.class);
kryo.register(QuoteType.class);
kryo.register(WriteProtectedTicker.class, new
TickerSerializer());
// Cache the kryo instance ...
converters.set(kryo);
}
return kryo;
}
/**
* This serializer is used to represent a compact representation
of a command when sent over the wire.
* <p>
*/
private static final class CommandsSerializer implements
Serializer<Commands> {
public Commands read(Kryo kryo, Input input, Class<Commands>
tickerClass) {
return
Commands.GET_REALTIME_QUOTE.fromCode(input.readByte());
}
public void write(Kryo kryo, Output output, Commands command)
{
output.writeByte(command.getCode());
}
}
/**
* This serializer is used to represent a compact representation
of a ticker when sent over the wire.
* <p>
*/
private static final class TickerSerializer implements
Serializer<Ticker> {
public Ticker read(Kryo kryo, Input input, Class<Ticker>
tickerClass) {
String cusip = kryo.readObject(input, String.class);
String symbol = kryo.readObject(input, String.class);
String exchangeName = kryo.readObject(input,
String.class);
Ticker ticker = new Ticker();
ticker.setCusip(cusip);
ticker.setSymbol(symbol);
ticker.setExchange(Exchange.NYSE.fromMappedValue(exchangeName));
return ticker;
}
public void write(Kryo kryo, Output output, Ticker ticker) {
String symbol = ticker.getSymbol();
String cusip = ticker.getCusip();
String exchange = ticker.getExchange().toMappedValue();
kryo.writeObject(output, cusip);
kryo.writeObject(output, symbol);
kryo.writeObject(output, exchange);
}
}
/**
* This serializer is used for transforming lists built using the
Arrays.asList() function.
* <p>
*/
private static final class ArrayAsListSerializer extends
ArraySerializer {
private Field arrayField;
public ArrayAsListSerializer() {
try {
arrayField = Class.forName("java.util.Arrays
$ArrayList").getDeclaredField("a");
arrayField.setAccessible(true);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public Object read(Kryo kryo, Input input, Class listClass) {
Object[] array = (Object[]) super.read(kryo, input,
Object[].class);
return Arrays.asList(array);
}
public void write(Kryo kryo, Output output, Object list) {
try {
final Object[] array = (Object[])
arrayField.get(list);
super.write(kryo, output, array);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}