Problems Hadoop : java.io.IOException: Pass a Delete or a Put

40 views
Skip to first unread message

Ahmad Anshorimuslim Syuhada

unread,
Jan 6, 2015, 2:24:36 AM1/6/15
to hadoop-learner-tutori...@googlegroups.com
FYI,
I am new to hadoop and scala.

I got this error:
java.io.IOException: Pass a Delete or a Put
    at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
    at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:586)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:398)

when I run this code:

package com.example

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Get
import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce._
import scala.collection.JavaConversions._

case class HString(name: String) {
        lazy val bytes = name.getBytes
        override def toString = name
}
object HString {
        import scala.language.implicitConversions
        implicit def hstring2String(src: HString): String = src.name
        implicit def hstring2Bytes(src: HString): Array[Byte] = src.bytes
}

object Families {
        val stream = HString("stream")
        val identity = HString("identity")
}
object Qualifiers {
        val title = HString("title")
        val url = HString("url")
        val media = HString("media")
        val media_source = HString("media_source")
        val content = HString("content")
        val nolimitid_timestamp = HString("nolimitid.timestamp")
        val original_id = HString("original_id")
        val timestamp = HString("timestamp")
        val date_created = HString("date_created")
        val count = HString("count")
}
object Tables {
        val rawstream100 = HString("raw_stream_1.0.0")
        val rawstream = HString("rawstream")
}

class tmapper extends TableMapper[ImmutableBytesWritable, Put]{
  def map (row: ImmutableBytesWritable, value: Result, context: Context) {
    val put = new Put(row.get())
    for (kv <- value.raw()) {
        put.add(kv)
    }
    context.write(row, put)
  }
}

object Hello {
  val hbaseMaster = "127.0.0.1:60000"
  val hbaseZookeper = "127.0.0.1"
  def main(args: Array[String]): Unit = {
        val conf = HBaseConfiguration.create()
    conf.set("hbase.master", hbaseMaster)
    conf.set("hbase.zookeeper.quorum", hbaseZookeper)
    val hbaseAdmin = new HBaseAdmin(conf)

    val job = Job.getInstance(conf, "CopyTable")
    job.setJarByClass(classOf[Hello])
    job.setMapperClass(classOf[tmapper])
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[Result])
    //
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Put])

        val scan = new Scan()
        scan.setCaching(500)         // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false)   // don't set to true for MR jobs

        TableMapReduceUtil.initTableMapperJob(
          Tables.rawstream100.bytes,     // input HBase table name
          scan,                      // Scan instance to control CF and attribute selection
          classOf[tmapper],  // mapper class
          null,             // mapper output key class
          null,     // mapper output value class
          job
        )

        TableMapReduceUtil.initTableReducerJob(
          Tables.rawstream,          // Table name
          null, // Reducer class
          job
        )
        val b = job.waitForCompletion(true);
        if (!b) {
            throw new IOException("error with job!");
        }
  }
}

class Hello {}

I don't have a clue, where it goes wrong. Would you enlighten me ?

Reply all
Reply to author
Forward
0 new messages