How should I serialize a JSON file in Scala?

761 views
Skip to first unread message

Animesh Pandey

unread,
May 28, 2015, 5:08:23 PM5/28/15
to scala-l...@googlegroups.com
I am trying to read a bunch of text files which have documents. I convert each document into a JSON object. I want to save JSON to a single file. How can I do that?

I tried the following code:
package helpers

import java.io.{FileOutputStream, ObjectOutputStream, PrintWriter, File}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import play.api.libs.json._
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConversions._
import org.elasticsearch.hadoop.mr.EsOutputFormat
import org.elasticsearch.hadoop.mr.EsInputFormat
import org.elasticsearch.hadoop.cfg.ConfigurationOptions
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{MapWritable, Text, NullWritable}
import org.elasticsearch.spark._

class esData(var id: String, var text: String) extends Serializable {
 
var es_json: JsValue = Json.obj()
  es_json
= Json.obj(
   
"_index" -> "ES_SPARK_AP",
   
"_type" -> "document",
   
"_id" -> id,
   
"_source" -> Json.obj(
     
"text" -> text
   
)
 
)
  val oos
= new ObjectOutputStream(new FileOutputStream("/home/test.json"))
  oos
.writeObject(es_json)
  oos
.close()
}

class trySerialize {
 
def bar() {
   
var es_json: JsValue = Json.obj()
    es_json
= Json.obj(
     
"_index" -> "ES_SPARK_AP",
     
"_type" -> "document",
     
"_id" -> "12345",
     
"_source" -> Json.obj(
       
"text" -> "Eureka!"
     
)
   
)
    println
(es_json)
 
}
 
def foo() {
    val conf
= new SparkConf()
     
.setAppName("linkin_spark")
     
.setMaster("local[2]")
     
.set("spark.executor.memory", "1g")
     
.set("spark.rdd.compress", "true")
     
.set("spark.storage.memoryFraction", "1")
    val sc
= new SparkContext(conf)

    val writer
= new PrintWriter(new File("/home/test.json"))
   
for (i <- 1 to 10) {
      val es_json
= new esData(i.toString(), "Eureka!")
      println
(es_json)
     
//writer.write(es_json.toString() + "\n")
   
}
    writer
.close()
 
}
}

class jsonSerialize() {
 
def readDocumentData() {
    val conf
= new SparkConf()
     
.setAppName("linkin_spark")
     
.setMaster("local[2]")
     
.set("spark.executor.memory", "1g")
     
.set("spark.rdd.compress", "true")
     
.set("spark.storage.memoryFraction", "1")
     
.set("es.index.auto.create", "true")

    val sc
= new SparkContext(conf)
    val sqlContext
= new SQLContext(sc)
   
import sqlContext._

    val temp
= sc.wholeTextFiles("/home/ap890101")
    val docStartRegex
= """<DOC>""".r
    val docEndRegex
= """</DOC>""".r
    val docTextStartRegex
= """<TEXT>""".r
    val docTextEndRegex
= """</TEXT>""".r
    val docnoRegex
= """<DOCNO>(.*?)</DOCNO>""".r
    val writer
= new PrintWriter(new File("/home/test.json"))

   
for (fileData <- temp) {
      val filename
= fileData._1
      val content
: String = fileData._2
      println
(s"For $filename, the data is:")
     
var startDoc = false // This is for the
     
var endDoc = false // whole file
     
var startText = false //
     
var endText = false //
     
var textChunk = new ListBuffer[String]()
     
var docID: String = ""
     
var es_json: JsValue = Json.obj()

     
//val results: Iterator[JsValue] =
     
for (current_line <- content.lines) {
        current_line match
{
         
case docStartRegex(_*) => {
            startDoc
= true
            endText
= false
            endDoc
= false
         
}
         
case docnoRegex(group) => {
            docID
= group.trim
         
}
         
case docTextStartRegex(_*) => {
            startText
= true
         
}
         
case docTextEndRegex(_*) => {
            endText
= true
            startText
= false
         
}
         
case docEndRegex(_*) => {
            endDoc
= true
            startDoc
= false
            es_json
= Json.obj(
             
"_id" -> docID,
             
"_source" -> Json.obj(
               
"text" -> textChunk.mkString(" ")
             
)
           
)
           
//val es_json = new esData(docID, textChunk.mkString(" "))
           
//writer.write(s"json") // throws error
           
//sc.makeRDD(Seq(Json.stringify(es_json))).saveToEs("ES_SPARK_AP/document") // throws error
           
//val input = jsonFile(Json.stringify(es_json)) // throws error
            //writer.write(Json.stringify(es_json)) // throws error
            println
(es_json)
           
//input.printSchema()
           
//println(input.schema)
            textChunk
.clear()
         
}
         
case _ => {
           
if (startDoc && !endDoc && startText) {
              textChunk
+= current_line.trim
           
}
         
}
       
}
     
}
   
}
    writer
.close()
 
}
}

object Main2 {
 
def main(args: Array[String]) {
    val obj
= new trySerialize()
    val obj2
= new jsonSerialize()
   
//obj.foo()
    obj2
.readDocumentData()
 
}
}

I tried serialsing it this way but I cannot make the JSON go into a file. I even created a new class esData that is Serializable but still I get the same error. The function foo() works with or without Serializable but readDocumentData does not work? I cannot understand what to do? The document that I am using for testing is here: https://www.dropbox.com/s/gtgxhqqy1ngi7ok/ap890101?dl=0

Please advice.

Seth Tisue

unread,
Jun 1, 2015, 1:00:45 PM6/1/15
to scala-l...@googlegroups.com
This question would be more appropriate for Stack Overflow or the scala-user group, not scala-language. (http://www.scala-lang.org/community/ explains what the different lists are for.)

Seth

Reply all
Reply to author
Forward
0 new messages