KeyValTailAssembly throws ClassCastException as it tries to convert String to byte[]

62 views
Skip to first unread message

Tushar Deshpande

unread,
Apr 9, 2013, 5:59:58 PM4/9/13
to elephan...@googlegroups.com
Hi,

I am new to the ElephantDB.  I read the blog post introducing
ElephantDB at http://backtype.posterous.com/introducing-elephantdb-a-distributed-database
and tried to run the sample example from the blog post.  This
example uses elephantdb-cascading to read the key/value pairs
from a file and stores them into ElephantDB.

First of all, using a JCascalog query, I stored some data into a
hadoop sequential file.  In this query I defined the sink as
Api.hfsSeqfile("/tmp/gendercount").  This query
executed correctly and the results were written to the sink.  The
output data contains two fields: gender and count, where gender
is String and count is int.  Also, this data is in the byte format.

Then, I decided to store contents of /tmp/gendercount into the
ElephantDB.  So, I wrote following code:

Tap source = new Hfs(new SequenceFile(new Fields("key", "value")), "/tmp/gendercount");
DomainSpec spec = new DomainSpec(new JavaBerkDB(), new HashModScheme(), 32);
ElephantDBTap sink = new ElephantDBTap("/tmp/elephantdb/gendercount", spec, new ElephantDBTap.Args(), TapMode.SINK);
Pipe p = new Pipe("pipe");
p = new KeyValTailAssembly(p, sink);
FlowConnector flowConnector = new HadoopFlowConnector();
flowConnector.connect(source, sink, p).complete();


The source tap reads the key/value pairs, where key is String and the
values are ints. 

When I ran this code, the KeyValTailAssembly threw a ClassCastException.
It complained that it was unable to convert the key of type String
to byte[].  Here is the relevant portion of the stack trace:

java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at elephantdb.cascading.KeyValTailAssembly$Shardize.operate(KeyValTailAssembly.java:42)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)

Line 42 of the KeyValTailAssembly indeed tries to cast an Object
to byte[].  Here is the relevant code from the KeyValTailAssembly.
The exception is encountered while executing (byte[])key.

public void operate(FlowProcess process, FunctionCall call) {
      Object key = call.getArguments().getObject(0);

      int shard = shardIndex((byte[])key);
      call.getOutputCollector().add(new Tuple(shard));
}


I wonder why am I getting this exception especially, since both
the key and value are simple data types and they are being
read from a hadoop sequence file.  Would someone please help
me understand what I am missing?


Best regards,

Tushar Deshpande

Soren Macbeth

unread,
Apr 9, 2013, 6:10:27 PM4/9/13
to elephan...@googlegroups.com
Hi,

Sorry, that tutorial is very very out of date. I'm intending to write a new one because many of the important details have changed. As the class cast exception hints at, both keys and values must be serialized or otherwise converted to byte[] before they can be written to elephantdb. 

This is a good place to post other question or problems you run into along the way, though, so keep them coming as you run into other issues. 

Best,
Soren




--
You received this message because you are subscribed to the Google Groups "elephantdb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elephantdb-us...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
http://about.me/soren

Tushar Deshpande

unread,
Apr 9, 2013, 6:26:57 PM4/9/13
to elephan...@googlegroups.com
Thanks Soren. Yes, I would eagerly await the updated
tutorial.

So, my code should work if the source file /tmp/gendercount
contained the key/value pairs in the the serialized format?

The /tmp/gendercount file is a hadoop sequence file and
therefore, it is in the binary format. So, I thought that the
its contents are serialized. Am I right?


Thanks,

Tushar Deshpande
> You received this message because you are subscribed to a topic in the
> Google Groups "elephantdb-user" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/elephantdb-user/WLUhadjeRzk/unsubscribe?hl=en.
> To unsubscribe from this group and all its topics, send an email to

Soren Macbeth

unread,
Apr 9, 2013, 6:31:24 PM4/9/13
to elephan...@googlegroups.com
Tushar,

The 'key' and 'value' fields coming out of your source tap must be converted to byte[]'s. Previously you said before the key was a String and the value was an int, each of these will need to be converted to a byte array before passing them to the KeyValTailAssembly.

-Soren

Tushar Deshpande

unread,
Apr 9, 2013, 6:51:18 PM4/9/13
to elephan...@googlegroups.com
Hi Soren,

Could it be possible that my source tap is desearializing
the data as it is read? I am trying to figure it out.


Best regards,

Tushar Deshpande

Soren Macbeth

unread,
Apr 9, 2013, 7:25:36 PM4/9/13
to elephan...@googlegroups.com
Tushar, yes, your source tap is deserializing the data as it reads it out.

Tushar Deshpande

unread,
Apr 9, 2013, 7:30:07 PM4/9/13
to elephan...@googlegroups.com
Hi Soren,

Here is my source tap definition.

Tap source = new Hfs(new SequenceFile(new Fields("key", "value")),
"/tmp/gendercount");

Is there some specific config that I should use to ask
the tap not to deserialize the data. I am searching for
the solution, but I would appreciate if you can give me
some hints.



Best regards,

Tushar Deshpande

Soren Macbeth

unread,
Apr 9, 2013, 7:37:44 PM4/9/13
to elephan...@googlegroups.com
You are either going to have to create a new sequence file with byte[]'s written into it, or write some cascading operations to convert your key's and value's to byte[]'s before sinking them to elephantdb. I use cascalog to interact with elephantdb, so the specifics of how to do this with java cascading escape me, but something like using an Each operator that does something like key.getBytes("UTF-8") with convert your String key to a byte[]. 

The cascading docs are pretty good, I'd point you to those for the details. 

Tushar Deshpande

unread,
Apr 9, 2013, 7:51:47 PM4/9/13
to elephan...@googlegroups.com
Thank you Soren. Yes, I would try to use the each operator
as you have suggested. Btw, I would be very happy if it is
possible to also use JCascalog to interact with elephantdb.
My source file is being created by a JCascalog query. So,
I thought that everything should work fine. But, looks like
I would have to try your approach. I am eagerly awaiting
the next update of Nathan Marz's Big Data book. He is going
to explain elephantdb in the seventh chapter.


Best regards,

Tushar Deshpande

Tushar Deshpande

unread,
Apr 11, 2013, 5:21:26 PM4/11/13
to elephan...@googlegroups.com
Hi Soren,

As you suggested, I tried to write a custom cascading function to convert
each data tuple into byte[ ].  I used getBytes("UTF-8") method to convert
string fields to byte[ ].  But, I got an exception because of cascading's failure
to load the serializer for byte[ ].  This clearly indicates that cascading tried
to serialize the byte[ ].  I believe that this happened, since cascading
serializes output tuples.  I then tried to register the BytesSerialization by
calling TupleSerializationProps.addSerialization(properties, BytesSerialization.class.getName()).
But, somehow I still kept on getting the same exception.

This prompted me to take another approach.  I noticed that KeyValTailAssembly
and ElephantScheme at several places cast the key and values to byte[ ].  I
replaced all these casts with a call to Utils.serializeObject(obj).  Then, I
rebuilt ElephantDB.  After this, everything worked fine and I was able to
successfully load the data of type (String, Integer) into ElephantDB.  Is this
workaround really justified?


Thanks,

Tushar Deshpande

Soren Macbeth

unread,
Apr 11, 2013, 5:24:00 PM4/11/13
to elephan...@googlegroups.com
No, you shouldn't have to do that. There is something off in your cascading flow, but I don't know cascading well enough to tell you what it is. I suggest that you post to the cascading-users group. They will most likely be able to help you there.


--
You received this message because you are subscribed to the Google Groups "elephantdb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elephantdb-us...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Tushar Deshpande

unread,
Apr 11, 2013, 5:30:40 PM4/11/13
to elephan...@googlegroups.com
Yes Soren,  I would post a query on the cascading-user forum, since
I too do not know cascading well enough.  I think that I may be missing
some configurations.

Tushar Deshpande

unread,
Apr 15, 2013, 5:18:15 PM4/15/13
to elephan...@googlegroups.com
Hi Soren,

I was able to push my data into ElephantDB without making any changes
to the KeyValTailAssembly and ElephantScheme.  I wrote a custom
cascading function to explicitly convert both the fields of each tuple to
byte[].

Here is my code:

public static void pushGenderCountToDB() throws IOException {
       
        Properties properties = new Properties();
        TupleSerializationProps.addSerialization(properties, BytesSerialization.class.getName());

       
        Tap source = new Hfs(new SequenceFile(new Fields("key", "value")), "/tmp/gendercount");
       
        DomainSpec spec = new DomainSpec(new JavaBerkDB(), new HashModScheme(), 32);
        ElephantDBTap sink = new ElephantDBTap("/tmp/elephantdb/gendercount", spec, new ElephantDBTap.Args(), TapMode.SINK);

        Pipe p = new Pipe("pipe");

        Function serializer = new SerializeKey();
        p = new Each(p, new Fields("key", "value"), serializer);


        p = new KeyValTailAssembly(p, sink);
       
        FlowConnector flowConnector = new HadoopFlowConnector(properties);

        Flow flow = flowConnector.connect(source, sink, p);
        flow.complete();

}

public static class SerializeKey extends BaseOperation implements Function{
        @Override

        public void operate(FlowProcess process, FunctionCall call) {
            String gender = (String) call.getArguments().getObject(0);
            Long count = (Long) call.getArguments().getObject(1);
            call.getOutputCollector().add(new Tuple(Utils.serializeObject(gender), Utils.serializeObject(count)));
        }
}


Now, I can move to the next step, i.e., reading the data from the ElephantDB.


Best regards,

Tushar Deshpande

Soren Macbeth

unread,
Apr 15, 2013, 5:19:34 PM4/15/13
to elephan...@googlegroups.com
Great!


--
You received this message because you are subscribed to the Google Groups "elephantdb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elephantdb-us...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Reply all
Reply to author
Forward
0 new messages