FYI,
I am new to hadoop and scala.
I got this error:
java.io.IOException: Pass a Delete or a Put
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:586)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)
when I run this code:
package com.example
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Get
import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._
import scala.collection.JavaConversions._
case class HString(name: String) {
lazy val bytes = name.getBytes
override def toString = name
}
object HString {
import scala.language.implicitConversions
implicit def hstring2String(src: HString): String = src.name
implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
}
object Families {
val stream = HString("stream")
val identity = HString("identity")
}
object Qualifiers {
val title = HString("title")
val url = HString("url")
val media = HString("media")
val media_source = HString("media_source")
val content = HString("content")
val nolimitid_timestamp = HString("nolimitid.timestamp")
val original_id = HString("original_id")
val timestamp = HString("timestamp")
val date_created = HString("date_created")
val count = HString("count")
}
object Tables {
val rawstream100 = HString("raw_stream_1.0.0")
val rawstream = HString("rawstream")
}
class tmapper extends TableMapper[ImmutableBytesWritable, Put]{
def map (row: ImmutableBytesWritable, value: Result, context: Context) {
val put = new Put(row.get())
for (kv <- value.raw()) {
put.add(kv)
}
context.write(row, put)
}
}
object Hello {
val hbaseMaster = "127.0.0.1:60000"
val hbaseZookeper = "127.0.0.1"
def main(args: Array[String]): Unit = {
val conf = HBaseConfiguration.create()
conf.set("hbase.master", hbaseMaster)
conf.set("hbase.zookeeper.quorum", hbaseZookeper)
val hbaseAdmin = new HBaseAdmin(conf)
val job = Job.getInstance(conf, "CopyTable")
job.setJarByClass(classOf[Hello])
job.setMapperClass(classOf[tmapper])
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[Result])
//
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Put])
val scan = new Scan()
scan.setCaching(500) // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false) // don't set to true for MR jobs
TableMapReduceUtil.initTableMapperJob(
Tables.rawstream100.bytes, // input HBase table name
scan, // Scan instance to control CF and attribute selection
classOf[tmapper], // mapper class
null, // mapper output key class
null, // mapper output value class
job
)
TableMapReduceUtil.initTableReducerJob(
Tables.rawstream, // Table name
null, // Reducer class
job
)
val b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
}
class Hello {}
I don't have a clue, where it goes wrong. Would you enlighten me ?