Cannot cast String to ImmutableBytesWritable in Cascading Maple

614 views
Skip to first unread message

Saad Rashid

unread,
Mar 1, 2013, 4:35:52 AM3/1/13
to cascadi...@googlegroups.com
Based on the fixed 4 months ago in Maple for ImmutableBytesWritable I am getting class cast exceptions in HBaseSource in Scalding Job.

Oscar Boykin

unread,
Mar 1, 2013, 1:25:45 PM3/1/13
to cascadi...@googlegroups.com
Can you post the stack trace errors? I'm not really sure I understand your question/comment.




On Fri, Mar 1, 2013 at 1:35 AM, Saad Rashid <saa...@gmail.com> wrote:
Based on the fixed 4 months ago in Maple for ImmutableBytesWritable I am getting class cast exceptions in HBaseSource in Scalding Job.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Oscar Boykin :: @posco :: https://twitter.com/intent/user?screen_name=posco

Saad Rashid

unread,
Mar 2, 2013, 2:01:18 PM3/2/13
to cascadi...@googlegroups.com
Hi Oscar,

I am getting following exception in my scalding job. I am using maple-0.2.5 with scalding 0.8.3. However this exception is not coming in old maple version 0.2.0 with scalding 0.8.0

2013-03-02 19:00:07,878 INFO  [Thread-14] zookeeper.RecoverableZooKeeper (RecoverableZooKeeper.java:<init>(89)) - The identifier of this process is 31...@qiqstanb057.qiq.local

2013-03-02 19:00:07,879 INFO  [Thread-14-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(958)) - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (Unable to locate a login configuration)

2013-03-02 19:00:07,879 INFO  [Thread-14-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(850)) - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session

2013-03-02 19:00:07,880 INFO  [Thread-14-SendThread(localhost:2181)] zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1187)) - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x13d2c787d3d0007, negotiated timeout = 40000

2013-03-02 19:00:07,887 INFO  [Thread-14] zookeeper.ZooKeeper (ZooKeeper.java:close(679)) - Session: 0x13d2c787d3d0007 closed

2013-03-02 19:00:07,887 INFO  [Thread-14-EventThread] zookeeper.ClientCnxn (ClientCnxn.java:run(511)) - EventThread shut down

2013-03-02 19:00:07,887 INFO  [Thread-14] hbase.HBaseTapCollector (?:initialize(?)) - Output format class is: class org.apache.hadoop.hbase.mapred.TableOutputFormat

2013-03-02 19:00:07,912 INFO  [Thread-14] hadoop.TupleSerialization (TupleSerialization.java:getDefaultComparator(196)) - using default comparator: com.twitter.scalding.IntegralComparator

2013-03-02 19:00:07,944 ERROR [Thread-14] stream.TrapHandler (TrapHandler.java:handleReThrowableException(103)) - caught Throwable, no trap available, rethrowing

cascading.tuple.TupleException: unable to sink into output identifier: 'unknown'

at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)

at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:93)

at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)

at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)

at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)

at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)

at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)

at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)

at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:93)

at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:86)

at com.twitter.scalding.MRMAggregator.complete(Operations.scala:200)

at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:139)

at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)

at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)

at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)

at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)

at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)

at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:448)

at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)

at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:442)

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.hbase.io.ImmutableBytesWritable

at com.twitter.maple.hbase.HBaseScheme.sink(Unknown Source)

at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)

... 19 more

2013-03-02 19:00:07,945 ERROR [Thread-14] stream.TrapHandler (TrapHandler.java:handleReThrowableException(103)) - caught Throwable, no trap available, rethrowing

cascading.tuple.TupleException: unable to sink into output identifier: 'unknown'

at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)

at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:93)

at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)

at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)

at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)

at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)

at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)

at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)

at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:93)

at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:86)

at com.twitter.scalding.MRMAggregator.complete(Operations.scala:200)

at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:139)

at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)

at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)

at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)

at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)

at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)

at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:448)

at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)

at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:442)

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.hbase.io.ImmutableBytesWritable

at com.twitter.maple.hbase.HBaseScheme.sink(Unknown Source)

at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)

... 19 more

2013-03-02 19:00:07,947 WARN  [Thread-14] mapred.LocalJobRunner (LocalJobRunner.java:run(479)) - job_local_0001

cascading.tuple.TupleException: unable to sink into output identifier: 'unknown'

at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)

at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:93)

at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:71)

at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)

at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)

at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)

at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)

at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)

at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:93)

at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:86)

at com.twitter.scalding.MRMAggregator.complete(Operations.scala:200)

at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:139)

at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)

at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)

at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)

at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:90)

at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)

at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:448)

at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)

at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:442)

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.hbase.io.ImmutableBytesWritable

at com.twitter.maple.hbase.HBaseScheme.sink(Unknown Source)

at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)



Regards,

Saad Rashid.

Saad Rashid

unread,
Mar 4, 2013, 10:45:20 AM3/4/13
to cascadi...@googlegroups.com
Hi,

I found that the HbaseScheme is Cast everything from Scalding Pipe to ImmutableBytesWritable. So I have to cast all my Fields into ImmutableBytesWritable. This is a bit problematic for me as I am generating more than 80 fields and even more due to working with Mahout with very large tuples. I cannot able to find a way around to map Fields.ALL -> (List or Array of Tuple) and convert everything to ImmutableBytesWritable. For few fields convert then as a tuple makes sense but not with large number of columns which is common in HBase schemas. Any suggestions ? Below is the scalding code.


val HBASE_SCHEMA

         List('field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, field13, field14, field15, field16, field17) ++

         List('field18, field19, field20, field21, field22, field23, field24, field25, field26, field27, field28, field29, field30, field31, field32, field33, field34) ++

         List('hod0, 'hod1, 'hod2, 'hod3, 'hod4, 'hod5, 'hod6, 'hod7, 'hod8, 'hod9, 'hod10, 'hod11) ++

         List('hod12,'hod13,'hod14,'hod15,'hod16,'hod17,'hod18,'hod19,'hod20,'hod21, 'hod22, 'hod23) ++  

         List('dow7, 'dow1, 'dow2, 'dow3, 'dow4, 'dow5, 'dow6) ++ 

         List('dom1, 'dom2, 'dom3, 'dom4, 'dom5, 'dom6, 'dom7, 'dom8, 'dom9, 'dom10, 'dom11, 'dom12, 'dom13, 'dom14, 'dom15) ++

         List('dom16, 'dom17, 'dom18, 'dom19, 'dom20, 'dom21, 'dom22, 'dom23, 'dom24, 'dom25, 'dom26, 'dom27, 'dom28, 'dom29, 'dom30, 'dom31) ++

         List('average_count, 'average_duration, 'average_variance, 'average_mean, 'average_dispersion )


Tsv(output)

.mapTo(Fields.ALL -> HBASE_SCHEMA) {

      fields:TupleEntry => {

    var hbaseTupleArray: Array[ImmutableBytesWritable] = Array()

        for (i <- 1 to (fields.size-1)) {

          hbaseTupleArray = hbaseTupleArray ++ Array[ImmutableBytesWritable](new ImmutableBytesWritable(fields.get(i).toString.getBytes()))

          //fields.selectTuple(new ImmutableBytesWritable(fields.get(i).toString.getBytes()))

        }

        hbaseTupleArray

      }

      }

Oscar Boykin

unread,
Mar 4, 2013, 1:09:35 PM3/4/13
to cascadi...@googlegroups.com
You could write a function to convert fields to ImmutableBytesWritable:


Can you add this to the how-to section of the scalding Wiki:


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Saad Rashid

unread,
Mar 7, 2013, 5:40:07 AM3/7/13
to cascadi...@googlegroups.com
Thanks Oscar.

I am using these functions to convert back and forth between HBase source and sink pipes in scalding.

def toBytesWritable(p: Pipe, f: Fields): Pipe = {
 //import Dsl._ // Make sure we have the scalding DSL in scope:
 import com.twitter.scalding.Dsl._
 import cascading.tuple.Fields._
 
 asList(f).foldLeft(p) { (oldPipe, fld) =>
   oldPipe.map(fld.toString() -> fld.toString()) { str: String => {
     //println("fieldsString " + str)
     new ImmutableBytesWritable(str.getBytes)
     }
   }
 }
}

def fromBytesWritable(p: Pipe, f: Fields): Pipe = {
 //import Dsl._ // Make sure we have the scalding DSL in scope:
 import com.twitter.scalding.Dsl._
 import cascading.tuple.Fields._
 
 asList(f).foldLeft(p) { (oldPipe, fld) =>
   oldPipe.map(fld.toString() -> fld.toString()) { str: ImmutableBytesWritable => {
     //println("fieldsString " + str)
    Bytes.toString(str.get)
     }
   }
 }
}

Regards,
Saad.
Reply all
Reply to author
Forward
0 new messages