#object
val o1 = sc.textFile("/global/project/projectdirs/paralleldb/BDAS/jesup/spark-0.7.3/O1.txt").map(line => line.split(","))
val waveIntensityPairsO1 = o1.map(line => (line(0).toInt,line(1).toDouble))
val waveErrorPairsO1 = o1.map(line => (line(0).toInt,1/line(2).toDouble))
#metadata
val m1 = sc.textFile("/global/project/projectdirs/paralleldb/BDAS/jesup/spark-0.7.3/M1.txt").map(line => line.split(","))
val waveIntensityPairsM1 = m1.map(line => (line(0).toInt,line(1).toDouble))
#finding alpha
val numerator1 = waveIntensityPairsM1.union(waveIntensityPairsO1).union(waveErrorPairsO1).reduceByKey(_ * _).map(l => l._2).reduce(_ + _)
val denominator1 = m_1.map(line => (line(0).toInt,line(1).toDouble*line(1).toDouble)).union(waveErrorPairsO1).reduceByKey(_ * _).map(l => l._2).reduce(_ + _)
val alpha1 = numerator1/denominator1
#finding chi^2
val num1Temp = m_1.map(line => (line(0).toInt, alpha1*line(1).toDouble))
val num1Temp2 = num1Temp.union(waveIntensityPairsO1).reduceByKey(_ - _)
val chiSquared1 = num1Temp2.union(num1Temp2).union(waveErrorPairsO1).reduceByKey(_ * _).map(l => l._2).reduce(_ + _)
#NOW WITH MULTIPLE METADATA
val data = sc.textFile("/global/project/projectdirs/paralleldb/BDAS/jesup/spark-0.7.3/flux_ins.1000.csv").map(l => l.split(",")).map(l => (l(0).toInt, (l(1).toInt, l(2).toDouble))).groupByKey
data consists of pairs of the form (ID, Seq[(wavelength, intensity)])
val sub_data = data.map(l => l._2)
val sub_data = data.map(l=> l._2).map(l=>l.toArray.map(l=> (l._1,l._2)))
data.map(l=> (l._1, l._2.map(l=> sc.parallelize(l))))