Cassandra Composite Type <=> Hive Complex Type

645 views
Skip to first unread message

aaron morton

unread,
Aug 9, 2011, 8:16:49 PM8/9/11
to Brisk Users
Before I waste too much time thought I'd check if anyone else is thinking about or working on this ?

I have an Cassandra 0.8 (static) CompositeType defined as (IntegerType, UTF8Type) that I would like to make available in hive as a STRUCT<BigInt, String>. I've tried the basic…

create column family foo
with comparator = 'CompositeType(IntegerType, AsciiType)'
and key_validation_class = UTF8Type
and default_validation_class = UTF8Type;

[default@rftest] set foo['bar']['123:monkeys'] = 'baz';
[default@wiki] get foo['bar'];
=> (column=123:monkeys, value=baz, timestamp=1312896013229000)
Returned 1 results.

CREATE EXTERNAL TABLE foo
(row_key string, foo STRUCT<count:BigInt, article:string>, col_value string)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,:column,:value",
"cassandra.cf.name" = "foo");

hive> select * from foo;
WARN 11:48:48,569 Missing fields! Expected 2 fields but only got 1! Ignoring similar problems.
bar {"count":null,"article":null} baz
Time taken: 0.323 seconds

I'll need it for read and write. Somewhere in my little brain have the idea that Hive does not support writing it's ComplexTypes via script, cannot find where I read that now. However in our case we map through the org.apache.hadoop.hive.cassandra.output package into Cassandra types, so the actual conversion to a Cassandra Composite type will (obviously) happen cassandra server side.

Any thoughts ?

Cheers


-----------------
Aaron Morton
Freelance Cassandra Developer
@aaronmorton
http://www.thelastpickle.com

aaron morton

unread,
Aug 24, 2011, 1:11:46 AM8/24/11
to brisk...@googlegroups.com
I have a version of this that works, but (read on)….

First the good part, created this CF….

create column family foo 
 with comparator = 'CompositeType(LongType(reversed=true), UTF8Type)'
 and key_validation_class = UTF8Type
 and default_validation_class = UTF8Type;

Then created this Hive Table

CREATE EXTERNAL TABLE foo
     (time_bucket string, score_article STRUCT<col1:BIGINT, col2:STRING>, article string)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
    WITH SERDEPROPERTIES ("cassandra.columns.mapping" = ":key,:column,:value",
                        "cassandra.cf.name" = "foo"); 

Then used this Hive statement which sums page hits and writes them out....

insert overwrite table foo
SELECT 
    CONCAT(tsm.bucket_start, '/', tsm.bucket_name), STRUCT(SUM(hts.hits), hts.article), hts.article 
FROM 
    <snip>

This creates a string , a Struct and a string

Checked the data in the CLI

[default@wiki] get foo['20110625010000/15n'] limit 5;
=> (column=168:"Weird_Al"_Yankovic, value="Weird_Al"_Yankovic, timestamp=1314159252433)
=> (column=98:.357_Magnum, value=.357_Magnum, timestamp=1314159252644)
=> (column=87:.45_ACP, value=.45_ACP, timestamp=1314159252686)
=> (column=84:127_Hours, value=127_Hours, timestamp=1314159253144)
=> (column=61:'N_Sync, value='N_Sync, timestamp=1314159252468)
Returned 5 results.

Read the data in the Hive
select * from foo limit 5;
20110625010000/15n {"col1":168,"col2":"\"Weird_Al\"_Yankovic"} "Weird_Al"_Yankovic
20110625010000/15n {"col1":98,"col2":".357_Magnum"} .357_Magnum
20110625010000/15n {"col1":87,"col2":".45_ACP"} .45_ACP
20110625010000/15n {"col1":84,"col2":"127_Hours"} 127_Hours
20110625010000/15n {"col1":61,"col2":"'N_Sync"} 'N_Sync
 

The is a commit here with the *un finished* changes https://github.com/amorton/hive/commit/e8fbcf7a9d101d2b51d96878a8a74ff27f11e7a0

Notes / Questions:
1 - I need to add a  Cassandra Composite Type definition SerDeProperty to the table, I've work out how to access it in both read and write paths. It's currently hard coded in. How about:
cassandra.columns.composite = "<column_name>:<type_def>;…"
   e.g. "chart_entry:CompositeType(LongType(reversed=true), AsciiType);foo:CompositeType(<LongType;LongType>)"

2 - It's a string based mapping at the moment. LazyCassandraRow uses the cassandra type to make a string, and then replaces the cassandra delim with the hive one when reading a cassandra value. TableMapping creates a string like the CLI does and uses the Cassandra Type to make the byte array to send back to cassandra. 

This could become a binary mapping, and use the existing CassandraLazy* implementations:
* need to subclass the Hive LazyStruct, just enough to override where it creates the lazy primitive fields so we can create cassandra ones. Thats not possible in the current hive code, would need to override all of uncheckedGetField(). 
* modify CassandraLazyFactory to return CassandraLazyStruct
* work out where we can encode hive values to the expected cassandra bytes (not sure if it's needed, integer?)
* perhaps create an easy way on the CompositeType to get the byte[] of the component values. Originally I tried to do things in binary and had to parse the entire CompositeType byte[] myself, no the best going forward. We would need this when reading cass data, to set the byte[] for the lazy components of the struct. 
  
3. Anyone know whats happening with the composite delimiter ? I'm just replacing the ":" in any string with "_" for now. 

4. Would need to auto map cass composite types to hive structs in the auto gen tables. 

5. In the LasyCassandraRow when reading cass data I've not worked out how to properly get the seperator for the Struct I'm putting a value into. I've hard coded this for now so it only works with defaults. The value I want is on the object inspected of the target object but I cannot work out how to get the object inspector at that point (see LazySimpleStructObjectInspector)


Thoughts ? I'd like to get an idea how much of this would need to be implemented to get into the next release. The main issue is probably doing it all in binary. 

Cheers

-----------------
Aaron Morton
Freelance Cassandra Developer
@aaronmorton

Nate McCall

unread,
Aug 24, 2011, 11:36:54 AM8/24/11
to brisk...@googlegroups.com
I'll take a closer look at this later today, but do take a look at the
composite wrapper classes we have in Hector. They may make noodling
some of the type information significantly easier.

aaron morton

unread,
Aug 24, 2011, 4:06:43 PM8/24/11
to brisk...@googlegroups.com
Thanks for the tip, will take a look.

Cheer

-----------------
Aaron Morton
Freelance Cassandra Developer
@aaronmorton
http://www.thelastpickle.com

Todd Gruben

unread,
Jun 23, 2012, 10:45:11 AM6/23/12
to brisk...@googlegroups.com
Have you made any progress with this idea?  I could use a form of this feature if there was something I could work with.

---
Todd Gruben
@tgruben

Dipesh Chheda

unread,
Jan 23, 2013, 6:30:26 AM1/23/13
to brisk...@googlegroups.com
Hi Aaron,

I am getting exception while trying out exact same use case you have mentioned here.

Hadoop job information for null: number of mappers: 0; number of reducers: 0
2013-01-23 16:51:50,468 null map = 0%,  reduce = 0%
2013-01-23 16:51:53,471 null map = 100%,  reduce = 0%
[2013-01-23 16:51:55,604] FATAL {ExecReducer} -  org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"_col0":"171.21.133.178:8282","_col1":1350467700,"_col2":"FATAL"},"value":{"_col0":1},"alias":0}
    at org.apache.hadoop.hive.ql.exec.ExecReducer.reduce(ExecReducer.java:256)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:518)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:419)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:256)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: InvalidRequestException(why:Not enough bytes to read value of component 0)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:603)
    at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
    at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
    at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
    at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
    at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:762)
    at org.apache.hadoop.hive.ql.exec.GroupByOperator.forward(GroupByOperator.java:959)
    at org.apache.hadoop.hive.ql.exec.GroupByOperator.processAggr(GroupByOperator.java:798)
    at org.apache.hadoop.hive.ql.exec.GroupByOperator.processOp(GroupByOperator.java:724)
    at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:471)
    at org.apache.hadoop.hive.ql.exec.ExecReducer.reduce(ExecReducer.java:247)
    ... 3 more
Caused by: java.io.IOException: InvalidRequestException(why:Not enough bytes to read value of component 0)
    at org.apache.hadoop.hive.cassandra.output.CassandraAbstractPut.commitChanges(CassandraAbstractPut.java:69)
    at org.apache.hadoop.hive.cassandra.output.CassandraPut.write(CassandraPut.java:139)
    at org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat$1.write(HiveCassandraOutputFormat.java:69)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:589)
    ... 16 more
Caused by: InvalidRequestException(why:Not enough bytes to read value of component 0)
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:20253)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
    at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:922)
    at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:908)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.hive.cassandra.CassandraProxyClient.invoke(CassandraProxyClient.java:341)
    at $Proxy10.batch_mutate(Unknown Source)
    at org.apache.hadoop.hive.cassandra.output.CassandraAbstractPut.commitChanges(CassandraAbstractPut.java:67)
    ... 19 more

Ended Job = job_local_0001 with errors

I could share more details but was wondering, have you seen this exception before?

Thanks,
Dipesh

rachana gupta

unread,
Jul 22, 2013, 6:14:54 PM7/22/13
to brisk...@googlegroups.com
Hi Aaron,

I just picked you example and load some data in cassandra, but when I import data from the cassandra to hive I get

hive> select * from foo;                                                                      
OK
1374170825754    {"col1":null,"col2":null}    VIEW
1374170825754    {"col1":null,"col2":null}    VIEW
Time taken: 0.052 seconds, Fetched: 2 row(s)


I am not sure, why my structure is getting null values. I am using same structure and CREATE TABLE in hive as yours.

Rachana
Reply all
Reply to author
Forward
0 new messages