package helpers
import java.io.{FileOutputStream, ObjectOutputStream, PrintWriter, File}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import play.api.libs.json._
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConversions._
import org.elasticsearch.hadoop.mr.EsOutputFormat
import org.elasticsearch.hadoop.mr.EsInputFormat
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{MapWritable, Text, NullWritable}
import org.elasticsearch.spark._
class esData(var id: String, var text: String) extends Serializable {
var es_json: JsValue = Json.obj()
es_json = Json.obj(
"_index" -> "ES_SPARK_AP",
"_type" -> "document",
"_id" -> id,
"_source" -> Json.obj(
"text" -> text
)
)
val oos = new ObjectOutputStream(new FileOutputStream("/home/test.json"))
oos.writeObject(es_json)
oos.close()
}
class trySerialize {
def bar() {
var es_json: JsValue = Json.obj()
es_json = Json.obj(
"_index" -> "ES_SPARK_AP",
"_type" -> "document",
"_id" -> "12345",
"_source" -> Json.obj(
"text" -> "Eureka!"
)
)
println(es_json)
}
def foo() {
val conf = new SparkConf()
.setAppName("linkin_spark")
.setMaster("local[2]")
.set("spark.executor.memory", "1g")
.set("spark.rdd.compress", "true")
.set("spark.storage.memoryFraction", "1")
val sc = new SparkContext(conf)
val writer = new PrintWriter(new File("/home/test.json"))
for (i <- 1 to 10) {
val es_json = new esData(i.toString(), "Eureka!")
println(es_json)
//writer.write(es_json.toString() + "\n")
}
writer.close()
}
}
class jsonSerialize() {
def readDocumentData() {
val conf = new SparkConf()
.setAppName("linkin_spark")
.setMaster("local[2]")
.set("spark.executor.memory", "1g")
.set("spark.rdd.compress", "true")
.set("spark.storage.memoryFraction", "1")
.set("es.index.auto.create", "true")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext._
val temp = sc.wholeTextFiles("/home/ap890101")
val docStartRegex = """<DOC>""".r
val docEndRegex = """</DOC>""".r
val docTextStartRegex = """<TEXT>""".r
val docTextEndRegex = """</TEXT>""".r
val docnoRegex = """<DOCNO>(.*?)</DOCNO>""".r
val writer = new PrintWriter(new File("/home/test.json"))
for (fileData <- temp) {
val filename = fileData._1
val content: String = fileData._2
println(s"For $filename, the data is:")
var startDoc = false // This is for the
var endDoc = false // whole file
var startText = false //
var endText = false //
var textChunk = new ListBuffer[String]()
var docID: String = ""
var es_json: JsValue = Json.obj()
//val results: Iterator[JsValue] =
for (current_line <- content.lines) {
current_line match {
case docStartRegex(_*) => {
startDoc = true
endText = false
endDoc = false
}
case docnoRegex(group) => {
docID = group.trim
}
case docTextStartRegex(_*) => {
startText = true
}
case docTextEndRegex(_*) => {
endText = true
startText = false
}
case docEndRegex(_*) => {
endDoc = true
startDoc = false
es_json = Json.obj(
"_id" -> docID,
"_source" -> Json.obj(
"text" -> textChunk.mkString(" ")
)
)
//val es_json = new esData(docID, textChunk.mkString(" "))
//writer.write(s"json") // throws error
//sc.makeRDD(Seq(Json.stringify(es_json))).saveToEs("ES_SPARK_AP/document") // throws error
//val input = jsonFile(Json.stringify(es_json)) // throws error
//writer.write(Json.stringify(es_json)) // throws error
println(es_json)
//input.printSchema()
//println(input.schema)
textChunk.clear()
}
case _ => {
if (startDoc && !endDoc && startText) {
textChunk += current_line.trim
}
}
}
}
}
writer.close()
}
}
object Main2 {
def main(args: Array[String]) {
val obj = new trySerialize()
val obj2 = new jsonSerialize()
//obj.foo()
obj2.readDocumentData()
}
}