Google Groups

Re: Problem with Scalding HBase Join


Kenny Zhuo Ming LU Jan 18, 2013 5:58 PM
Posted in group: cascading-user
No problem and it is alright!

As promised. Here is the updated template for anyone who wants to start a hbase-scalding project.


Regards,
Kenny

On Saturday, 19 January 2013 01:57:17 UTC+8, Oscar Boykin wrote:
Thanks for posting back.

Sorry I didn't know the answer.


On Fri, Jan 18, 2013 at 12:58 AM, Kenny Zhuo Ming LU <luzh...@gmail.com> wrote:
I've figured out the problem myself. 

1) I was using an older version of scalding and maple

2) The hbase column and row id are now in "ImmutableBytesWritable" instead of Array[Byte] hence conversion is required.

I will post the sample codes here once I cleaned up mine.

Regards,
Kenny


On Wednesday, 16 January 2013 17:58:44 UTC+8, Kenny Zhuo Ming LU wrote:
Hi, 

I am starting to play with Scalding lately and I find it really nice. The primary goal that I want is to use it with HBase. After a few google search I landed on this 


All other pipe works so far except for the join.  To setup the hbase tables, I run the follow commands in the hbase shell.


  create 't1', 'cf'
  create 't2', 'cf'
  create 't3', 'cf'
  put 't1','1','cf:c1', '1'
  put 't2','1','cf:c2', '1'


I have the following little code to demonstrate the problem.
(I just git clone the above project and add this file into the 'src/main/scala/jobs' folder

package jobs

import com.twitter.scalding._
import cascading.tuple.Fields
import com.twitter.scalding.Args
import org.apache.hadoop.hbase.util.Bytes

class SimpleJoin(args: Args) extends Job(args) {
  def s(x:Array[Byte]):String = {
    Bytes.toString(x)
  }
  val t1 = new HBaseSource("t1", "localhost", 'rid1, Array("cf"), Array('c1))
  val t2 = new HBaseSource("t2", "localhost", 'rid2, Array("cf"), Array('c2))
  val t3 = new HBaseSource("t3", "localhost", 'rid1, Array("cf"), Array('c1, 'c2))


  val p1 = t1.read
  .map('rid1 -> 'rid1) {x:Array[Byte] => s(x)}
  .map('c1 -> 'c1) {x:Array[Byte] => s(x)}
  .map('c1 -> 'log1) {x:String => 
                       print("c1" + x)
      x
   }  
  
  val p2 = t2.read
  .map('rid2 -> 'rid2) {x: Array[Byte] => s(x)}
  .map('c2 -> 'c2) {x:Array[Byte] => s(x)}
  .map('c2 -> 'log2) {x:String => 
                       print("c2" + x)
      x
   }  
  
  p2.joinWithSmaller('c2 -> 'c1, p1) 
  .project('rid1 ,'c1, 'c2)
  .write(t3)
}



After compilation with sbt assembly 
and running with 
hadoop jar target/hbase-scalding-assembly-0.1.0.jar jobs.SimpleJoin --hdfs --host localhost


After the execution, the t3 is empty, which is not expected. Further more, I also try to print out the c1 and c2 in the two pipes. However, from the stdouts log, we can only observe 
"c11" is printed. "c21" is not printed, which implies that the p2 is not executed

Replacing the 'joinWithSmaller' with 'joinWithTiny' seems to work. But this is not what I want, because in the full project, I want to use LeftJoin and OuterJoin.

I just wonder anyone has some insights.

Regards,
Kenny


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/-KfQKXdgUJoJ.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.



--
Oscar Boykin :: @posco :: https://twitter.com/intent/user?screen_name=posco