Tinkegraph 3.2.1 - Serializing Avro Generic Record to gyro format

198 views
Skip to first unread message

Edwin Maljames

unread,
Aug 25, 2016, 4:44:22 AM8/25/16
to Gremlin-users
Hi All,
I would like to store avro generic record as a property in the graph vertex. I am serializing the graph to disk using gyro.

graph.io(IoCore.gryo()).writer().mapper(mapper).create().writeGraph(os, graph);

When I try to read the graph since avro Generic Record doesn't have a default no-arg constructor it was not able to load the graph back.

I tried to add my custom serializer and a registry and below is the code for the same

Mapper<Kryo> mapper = graph.io(IoCore.gryo()).mapper().addCustom(GenericData.Record.class)
       
.addRegistry(new GenericRecordRegistry())
       
.addCustom(TextNode.class, Class.forName("org.apache.avro.Schema$RecordSchema"),
           
Class.forName("org.apache.avro.Schema$Field"),
           
Class.forName("org.apache.avro.Schema$Field$Order"),
           
Class.forName("org.apache.avro.Schema$UnionSchema"),
           
Class.forName("org.apache.avro.Schema$Type"),
           
Class.forName("org.apache.avro.Schema$LockableArrayList"), //
           
Class.forName("org.apache.avro.Schema$BooleanSchema"),
           
Class.forName("org.apache.avro.Schema$NullSchema"),
           
Class.forName("org.apache.avro.Schema$StringSchema"), //
           
Class.forName("org.apache.avro.Schema$IntSchema"), //
           
Class.forName("org.apache.avro.Schema$FloatSchema"), //
           
Class.forName("org.apache.avro.Schema$EnumSchema"),
           
Class.forName("org.apache.avro.Schema$Name"),
           
Class.forName("org.apache.avro.Schema$ArraySchema"),
           
Class.forName("org.apache.avro.Schema$LongSchema"))
       
.create();

public class GenericRecordRegistry extends AbstractIoRegistry {

 
public GenericRecordRegistry() {
   
register(GryoIo.class, GenericData.Record.class, new GenericDataSerializer());

 
}
}

public class GenericDataSerializer extends Serializer<GenericRecord> {

 
public GenericDataSerializer() {
   
// TODO Auto-generated constructor stub
 
}

 
@Override
 
public GenericRecord read(Kryo arg0, Input arg1, Class<GenericRecord> arg2) {
   
// arg1.getInputStream();
   
GenericRecord record = null;
   
try {
     
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
     
byte[] bytes = new byte[500];
      arg1
.readBytes(bytes);
     
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);

      record
= reader.read(null, decoder);

   
} catch (IOException e) {
     
// TODO Auto-generated catch block
      e
.printStackTrace();
   
}

   
return record;
 
}

 
@Override
 
public void write(Kryo arg0, Output arg1, GenericRecord arg2) {
 
   
try {
     
ByteArrayOutputStream out = new ByteArrayOutputStream();
     
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(arg2.getSchema());
     
Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
      writer
.write(arg2, encoder);
      encoder
.flush();
     
out.close();
      arg1
.write(out.toByteArray());
   
} catch (KryoException e) {
     
// TODO Auto-generated catch block
      e
.printStackTrace();
   
} catch (IOException e) {
     
// TODO Auto-generated catch block
      e
.printStackTrace();
   
}
 
}
}

The above is not working.
I was trying to use avro chill from twitter to serialize and de-serialize avro but since the Kryo libraries are shaded was not able to use that.
Has anybody faced similar use case or any thoughts and suggestion to achieve it.

Thanks for your time and thoughts.

Regards,
Edwin

Stephen Mallette

unread,
Aug 25, 2016, 12:13:07 PM8/25/16
to Gremlin-users
I would have thought you were on the right track with GenericDataSerializer, but you say that isn't working. What kind of error are you getting?

--
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-users+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gremlin-users/2702a38d-ab5b-4968-b07f-da06e61edc92%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Edwin Maljames

unread,
Aug 26, 2016, 2:52:48 PM8/26/16
to Gremlin-users
Thanks for your response. Till GenericSerializer is good but now I cannot use DatumWriter and DatumReader as each vertex in the graph could potentially have Avro GenericRecords with different Schema so when i read i would not be able to determine the schema to use to read the deserialized object. So i changed it to the following one using Kryo Serializer thats from Chill. I am trying to see some lib that could support avro GenericRecord out of the box similar to how Spark supports Avro with Kryo.


public void write(Kryo kryo, Output output, GenericRecord record) {

   
KryoPool kryoPool = KryoPool.withByteArrayOutputStream(10, new KryoInstantiator());

   
byte[] ser = kryoPool.toBytesWithClass(record);

    output
.writeBytes(ser);

 
}

public GenericRecord read(Kryo tinkerKryo, Input input, Class<GenericRecord> arg2) {

   
KryoPool kryoPool = KryoPool.withByteArrayOutputStream(10, new KryoInstantiator());

   
GenericRecord record = (GenericRecord) kryoPool.fromBytes(input.getBuffer());

   
return record;
 
}


Even in this case kryo serializer says that it is not a registered class. Below is the exception stack trace
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 101
    at org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader$VertexInputIterator.next(GryoReader.java:329)
    at org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader$VertexInputIterator.next(GryoReader.java:303)
    at org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils.iterate(IteratorUtils.java:70)
    at org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader.readGraph(GryoReader.java:94)
    at com.jpmc.tv.graph.GraphProcessor.deserilaizeGraph(GraphProcessor.java:147)
    at com.jpmc.tv.graph.VertexCreationTest.VertexCreationTest(VertexCreationTest.java:92)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

Now i would have to register the avro classes again and end up coding my own avro serializers similar to what chill has in scala. I am trying to avoid that and reuse some that's already available.
Hope this makes sense and any suggestions or thoughts would be much appreciated.

Thanks,
Edwin
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-user...@googlegroups.com.

Stephen Mallette

unread,
Aug 30, 2016, 5:41:58 PM8/30/16
to Gremlin-users
I'm not sure I completely follow the situation your facing as I don't know avro libs too well. I assume your problems would be better resolved if TinkerPop didn't shade the kryo dependencies? is that right?

To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-users+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gremlin-users/61cc1ecb-69db-4bf0-985d-314a3366d6e0%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages