Hi,
I am starting to play with Scalding lately and I find it really nice. The primary goal that I want is to use it with HBase. After a few google search I landed on this
All other pipe works so far except for the join. To setup the hbase tables, I run the follow commands in the hbase shell.
create 't1', 'cf'
create 't2', 'cf'
create 't3', 'cf'
put 't1','1','cf:c1', '1'
put 't2','1','cf:c2', '1'
I have the following little code to demonstrate the problem.
(I just git clone the above project and add this file into the 'src/main/scala/jobs' folder
package jobs
import com.twitter.scalding._
import cascading.tuple.Fields
import com.twitter.scalding.Args
import org.apache.hadoop.hbase.util.Bytes
class SimpleJoin(args: Args) extends Job(args) {
def s(x:Array[Byte]):String = {
Bytes.toString(x)
}
val t1 = new HBaseSource("t1", "localhost", 'rid1, Array("cf"), Array('c1))
val t2 = new HBaseSource("t2", "localhost", 'rid2, Array("cf"), Array('c2))
val t3 = new HBaseSource("t3", "localhost", 'rid1, Array("cf"), Array('c1, 'c2))
val p1 = t1.read
.map('rid1 -> 'rid1) {x:Array[Byte] => s(x)}
.map('c1 -> 'c1) {x:Array[Byte] => s(x)}
.map('c1 -> 'log1) {x:String =>
print("c1" + x)
x
}
val p2 = t2.read
.map('rid2 -> 'rid2) {x: Array[Byte] => s(x)}
.map('c2 -> 'c2) {x:Array[Byte] => s(x)}
.map('c2 -> 'log2) {x:String =>
print("c2" + x)
x
}
p2.joinWithSmaller('c2 -> 'c1, p1)
.project('rid1 ,'c1, 'c2)
.write(t3)
}
After compilation with sbt assembly
and running with
hadoop jar target/hbase-scalding-assembly-0.1.0.jar jobs.SimpleJoin --hdfs --host localhost
After the execution, the t3 is empty, which is not expected. Further more, I also try to print out the c1 and c2 in the two pipes. However, from the stdouts log, we can only observe
"c11" is printed. "c21" is not printed, which implies that the p2 is not executed
Replacing the 'joinWithSmaller' with 'joinWithTiny' seems to work. But this is not what I want, because in the full project, I want to use LeftJoin and OuterJoin.
I just wonder anyone has some insights.
Regards,
Kenny