/* * GPL v.3 * Main class, implements mean temperature calculation and cross-correlation */
package co.cask.cdap.examples.bigchildren
import breeze.linalg.DenseVectorimport breeze.linalg.Vectorimport breeze.stats.meanAndVarianceimport breeze.stats.meanimport breeze.stats.variance
import co.cask.cdap.api.spark.ScalaSparkProgramimport co.cask.cdap.api.spark.SparkContext
import org.apache.spark.SparkContext._//import org.apache.spark.mllib.stat.corr
import org.apache.spark.rdd._ //NewHadoopRDD
import org.slf4j.Loggerimport org.slf4j.LoggerFactoryimport scala.math
import org.apache.commons.math3._import org.apache.commons.math3.stat.correlation.PearsonsCorrelation
//import java.io.PrintWriter//import java.io.FileWriter
/** * main class */class BigChildrenProgram extends ScalaSparkProgram { //var status: PrintWriter = new PrintWriter(new FileWriter("/home/psteger/logger", true)) private final val LOG: Logger = LoggerFactory.getLogger(classOf[BigChildrenProgram])
final val interval = 7 // [days] def parseTemp(fahrenheit: Double): Double = { (fahrenheit + 459.67)*5/9 }
def parsePrec(inches: Double): Double = { return inches*25.4 }
// we split a weather CSV file line by single whitespaces here, and convert parts to SI def parsewVector(line: String): Tuple3[Double,Double,Double] = { var dv = DenseVector(line.split(" ", -1)) val timestamp: Double = (dv(2)).toDouble val temperature: Double = parseTemp( (dv(3)).toDouble ) val precip: Double = parsePrec( (dv(13)).substring(0, math.max(2, (dv(13)).length()-1)).toDouble) return (timestamp, temperature, precip) }
// we split a CSV file line by single whitespaces here, no further conversion def parsemVector(line: String): Tuple2[Double,Double] = { val dv = DenseVector(line.split(" ", -1)) val year: Double = (dv(0)).toDouble // year val mort: Double = (dv(4)).toDouble // mortality per 1000 children return (year, mort) }
// find which interval a given date YYYY-MM-DD lies in // by counting all days since start of the year, and projecting into 52 bins def findInterval(dateID: Int, w: Double): Int = { val Y: Int = dateID/10000 val M: Int = (dateID - Y*10000)/100 println(M) val Day: Int = (dateID - Y*10000)%100 //LOG.info(Y, M, Day) var MonthLength = Array(31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31) if ((Y%4 == 0) && (Y%100 != 0 || Y%400 == 0)) { MonthLength(1) += 1 // add Feb 29 if necessary } // add up all completed months var dayofyear: Int = 0 for(i <- 1 until M) { println(i) dayofyear = dayofyear+MonthLength(i-1) } // add days since beginning of current month dayofyear += Day // map into week number (each of which is 7 days long) val out: Int = ((dayofyear-1)/w).toInt return out }
def weight(y: Int, yi: Int, sig: Double, w: Double): Double = { val out: Double = math.exp(-math.pow(y-yi,2)/(2*math.pow(sig,2)))/(math.sqrt(2*math.Pi)*sig) //*365.25/w return out }
override def run(sc: SparkContext) { val args: Array[String] = sc.getRuntimeArguments("args") //LOG.info("Running with arguments {}", args) // country to run for, if at all specified val country = if (args.length > 0) args(0) else "MI"
//LOG.info("Reading in weather data") // new API Hadoop RDD according to apache.org website, not directly instantiating // HadoopRDD val linesDataset: NewHadoopRDD[Array[Byte], String] = sc.readFromDataset("Weather", classOf[Array[Byte]], classOf[String]) // split by whitespaces, convert to Vector[Double], convert to SI units //LOG.info("parsing vectors") val wdata = linesDataset.map(kv => parsewVector(kv._2)).cache()
//LOG.info("Calculating Mean") // cycle through all datasets // get datestamp, and corresponding period (say, week, interval=7days) of interest // get temperature from col (0,1,2,)3, calc new mean // get precipitation from col 13, calc new mean
// do this via a Map-Reduce scheme // first get key value as period to hash to, // then extract temperature and precipitation
//status.println("reading and parsing data") // floor(p(2)/10000) gives entry year val tdat = wdata.map(p => (findInterval((p._1).toInt,interval), (math.floor(p._1/10000), p._2))) .filter( kv => ((kv._2._2) > 0.0 )) //status.println("pdat") val pdat = wdata.map(p => (findInterval((p._1).toInt,interval), (math.floor(p._1/10000), p._3))) .filter( kv => ((kv._2._2) >= 0.0 )) //status.println("filtered data")
//LOG.info("tred") // use the values in tred twice, thus we store it separately val tred = tdat.map(kv => (kv._1,kv._2._2)).groupByKey() // TODO: use meanAndVariance, and then extract the important pieces //val tmav = tred.map(v => (v._1, meanAndVariance(v._2))) //LOG.info("tMean") val tMean = tred.map(kv => (kv._1, mean(kv._2))) //LOG.info("tSig") val tSig = tred.map(kv => (kv._1, math.max(0.00001, math.sqrt(variance(kv._2))))) //LOG.info("pred") val pred = pdat.map(kv => (kv._1, kv._2._2)).groupByKey() //LOG.info("pMean") val pMean = pred.map(kv => (kv._1, mean(kv._2))) //LOG.info("pSig") val pSig = pred.map(kv => (kv._1, math.max(0.00001, math.sqrt(variance(kv._2)))))
// run through all values again, calculate distance from mean // d = data(this year) - mean(all years)/standard_deviation(all years) // do the join for each time interval // and store a new key, value pair, where key is the year //LOG.info("tDiff") val tDiff = tdat.join(tMean).map(kv => (kv._1, math.abs(kv._2._1._2-kv._2._2))) //LOG.info("pDiff") val pDiff = pdat.join(pMean).map(kv => (kv._1, math.abs(kv._2._1._2-kv._2._2))) // has key #interval, values (key: #year, diff from mean)
//LOG.info("tOut") val tOut = tDiff.join(tSig).map(kv => (kv._2._1, kv._2._1/kv._2._2)) //LOG.info("pOut") val pOut = pDiff.join(pSig).map(kv => (kv._2._1, kv._2._1/kv._2._2)) // has new key #year, values (diff from mean / sig)
// determine outlierness of a given year, summing over all time intervals this time //LOG.info("tweather_out") val tweather_out = tOut.groupByKey().map(kv => (kv._1, kv._2.sum)) //LOG.info("pweather_out") val pweather_out = pOut.groupByKey().map(kv => (kv._1, kv._2.sum))
//LOG.info("toutsort") val toutsort = tweather_out.sortByKey(true).filter(kv => ((kv._1) >= 1990 )).map(kv => kv._2).collect() //LOG.info("poutsort") val poutsort = pweather_out.sortByKey(true).filter(kv => ((kv._1) >= 1990 )).map(kv => kv._2).collect()
// read in mortality data //LOG.info("read in mortality") val mortality: NewHadoopRDD[Array[Byte], String] = sc.readFromDataset("ChildMortality", classOf[Array[Byte]], classOf[String]) // split by whitespaces, convert to Vector[Double] //LOG.info("parse mdata") val mdata = linesDataset.map(kv => parsemVector(kv._2)).cache() //LOG.info("m_out") val m_out = mdata.map(p => ((p._1).toInt, p._2)) //LOG.info("moutsort") val moutsort = m_out.sortByKey(true).map(kv => kv._2).collect()
// determine direct correlation as in Pearson; time-shift assumed small, no ARIMA needed //LOG.info("tcorr") val tcorr = (new PearsonsCorrelation()).correlation(toutsort, moutsort) //LOG.info("pcorr") val pcorr = (new PearsonsCorrelation()).correlation(poutsort, moutsort) m_out.map(kv => (pcorr, tcorr))
//LOG.info("Done!") //status.println("tcorr = "+ tcorr+ ", pcorr = "+ pcorr) //status.flush()
// write to file, for output later //tweather_out.saveAsTextFile("/home/psteger/tweather.out.dat") //pweather_out.saveAsTextFile("/home/psteger/pweather.out.dat") m_out.saveAsTextFile("/home/psteger/mortality.out.dat")
//val originalContext: org.apache.spark.SparkContext = sc.getOriginalSparkContext() //sc.writeToDataset(originalContext.parallelize(m_out), "correlations", classOf[Array[Byte]], classOf[String])
}
}
--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/3de608fa-7b58-4e2a-a59c-31bbcbc44499%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
<childmortality.txt>
<stations_MI.txt>
<alldata.txt>
{
"response": null,
"error": "internal server error",
"message": "Fail to get procedure."
}
// determine direct correlation as in Pearson; time-shift assumed small, no ARIMA needed LOG.info("tcorr") val tcorr = (new PearsonsCorrelation()).correlation(toutsort, moutsort) LOG.info("pcorr") val pcorr = (new PearsonsCorrelation()).correlation(poutsort, moutsort)
LOG.info("Done!")
// write to DataSet, for output later val output_array = Array(pcorr, tcorr) var centers = new Array[(Array[Byte], String)](1) centers(0) = Tuple2(country.getBytes, output_array.mkString(",")) val originalContext: org.apache.spark.SparkContext = sc.getOriginalSparkContext()
var outputrdd = originalContext.makeRDD[(Array[Byte], String)](centers)
sc.writeToDataset(outputrdd, "correlations", classOf[Array[Byte]], classOf[String])
/* * Copyright © 2014 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */
package co.cask.cdap.examples.bigchildren;
import co.cask.cdap.api.annotation.Handle;import co.cask.cdap.api.annotation.ProcessInput;import co.cask.cdap.api.annotation.UseDataSet;import co.cask.cdap.api.app.AbstractApplication;import co.cask.cdap.api.data.stream.Stream;import co.cask.cdap.api.dataset.lib.ObjectStore;import co.cask.cdap.api.dataset.lib.ObjectStores;import co.cask.cdap.api.flow.Flow;import co.cask.cdap.api.flow.FlowSpecification;import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;import co.cask.cdap.api.flow.flowlet.StreamEvent;import co.cask.cdap.api.procedure.AbstractProcedure;import co.cask.cdap.api.procedure.ProcedureRequest;import co.cask.cdap.api.procedure.ProcedureResponder;import co.cask.cdap.api.procedure.ProcedureResponse;import co.cask.cdap.api.spark.AbstractSpark;import co.cask.cdap.api.spark.SparkSpecification;import co.cask.cdap.internal.io.UnsupportedTypeException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import java.io.IOException;import java.nio.ByteBuffer;import java.nio.charset.Charset;import java.util.UUID;
/** * Application to find correlation between child mortality and outlierness of weather data */public class BigChildrenApp extends AbstractApplication { public static final Charset UTF8 = Charset.forName("UTF-8");
@Override public void configure() { setName("BigChildren"); setDescription("BigChildren app"); addStream(new Stream("weatherStream")); addStream(new Stream("childmortalityStream")); addFlow(new WeatherFlow()); addFlow(new ChildMortalityFlow()); addSpark(new BigChildrenSpecification()); addProcedure(new CorrelationProcedure());
try { ObjectStores.createObjectStore(getConfigurer(), "Weather", String.class); ObjectStores.createObjectStore(getConfigurer(), "ChildMortality", String.class); ObjectStores.createObjectStore(getConfigurer(), "Correlations", String.class); } catch (UnsupportedTypeException e) { // This exception is thrown by ObjectStore if its parameter type cannot be // (de)serialized (for example, if it is an interface and not a class, then there is // no auto-magic way deserialize an object.) In this case that will not happen // because String is an actual class. throw new RuntimeException(e); } }
/** * A Spark Program that uses BigData algorithms to find correlations. */ public static class BigChildrenSpecification extends AbstractSpark { @Override public SparkSpecification configure() { return SparkSpecification.Builder.with() .setName("BigChildrenProgram") .setDescription("BigChildren Program") .setMainClassName(BigChildrenProgram.class.getName()) .build(); } }
/** * This Flowlet reads events from a Stream and saves them to a datastore. */ public static class WeatherReader extends AbstractFlowlet {
private static final Logger LOG = LoggerFactory.getLogger(WeatherReader.class);
@UseDataSet("Weather") private ObjectStore<String> pointsStore;
@ProcessInput public void process(StreamEvent event) { String body = new String(event.getBody().array()); LOG.trace("Weather info: {}", body); pointsStore.write(getIdAsByte(UUID.randomUUID()), body); }
private static byte[] getIdAsByte(UUID uuid) { ByteBuffer bb = ByteBuffer.wrap(new byte[16]); bb.putLong(uuid.getMostSignificantBits()); bb.putLong(uuid.getLeastSignificantBits()); return bb.array(); } }
/** * This Flowlet reads events from a Stream and saves them to a datastore. */ public static class ChildMortalityReader extends AbstractFlowlet {
private static final Logger LOG = LoggerFactory.getLogger(WeatherReader.class);
@UseDataSet("ChildMortality") private ObjectStore<String> pointsStore;
@ProcessInput public void process(StreamEvent event) { String body = new String(event.getBody().array()); LOG.trace("Weather info: {}", body); pointsStore.write(getIdAsByte(UUID.randomUUID()), body); }
private static byte[] getIdAsByte(UUID uuid) { ByteBuffer bb = ByteBuffer.wrap(new byte[16]); bb.putLong(uuid.getMostSignificantBits()); bb.putLong(uuid.getLeastSignificantBits()); return bb.array(); } }
/** * Flow: consumes weather data from a Stream, stores them in a dataset */ public static class WeatherFlow implements Flow { @Override public FlowSpecification configure() { return FlowSpecification.Builder.with() .setName("WeatherFlow") .setDescription("Reads weather information for a given weather station and stores it in dataset") .withFlowlets() .add("reader", new WeatherReader()) .connect() .fromStream("weatherStream").to("reader") .build(); } }
/** * Flow: consumes childmortality data from a Stream, stores them in a dataset */ public static class ChildMortalityFlow implements Flow { @Override public FlowSpecification configure() { return FlowSpecification.Builder.with() .setName("ChildMortalityFlow") .setDescription("Reads child mortality information for a given weather station and stores it in dataset") .withFlowlets() .add("reader", new ChildMortalityReader()) .connect() .fromStream("childmortalityStream").to("reader") .build(); } }
/** * returns calculated correlation based on index parameter. */ public static class CorrelationProcedure extends AbstractProcedure {
private static final Logger LOG = LoggerFactory.getLogger(CorrelationProcedure.class);
// Annotation indicates that correlations dataset is used in the procedure. @UseDataSet("correlation") private ObjectStore<String> correlations;
@Handle("correlation") public void getCorrelations(ProcedureRequest request, ProcedureResponder responder) throws IOException, InterruptedException { String index = request.getArgument("index"); if (index == null) { responder.error(ProcedureResponse.Code.CLIENT_ERROR, "Index must be given as argument"); return; } LOG.debug("get correlation for index {}", index); // Send response with JSON format. responder.sendJson(correlations.read(index.getBytes())); } }}
--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/1187844f-2b63-47fc-9c77-2b7086cbe45f%40googlegroups.com.
{
"response": null,
"error": "Service Unavailable",
"message": "Service Unavailable"
}
2014-11-28 12:00:37,308 -
ERROR [procedure-executor-3:c.c.c.i.a.r.p.ReflectionHandlerMethod@64] - Exception in calling procedure handler: public void co.cask.cdap.examples.bigchildren.BigChildrenApp$CorrelationProcedure.getCorrelations(co.cask.cdap.api.procedure.ProcedureRequest,co.cask.cdap.api.procedure.ProcedureResponder) throws java.io.IOException,java.lang.InterruptedException
java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
at co.cask.cdap.internal.app.runtime.procedure.ReflectionHandlerMethod.handle(ReflectionHandlerMethod.java:62) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.ProcedureHandlerMethod.handle(ProcedureHandlerMethod.java:137) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.ProcedureDispatcher.handleRequest(ProcedureDispatcher.java:154) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.ProcedureDispatcher.messageReceived(ProcedureDispatcher.java:104) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.handler.execution.ChannelUpstreamEventRunnable.doRun(ChannelUpstreamEventRunnable.java:43) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:67) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:314) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.io.IOException: java.lang.NullPointerException
at co.cask.cdap.internal.app.runtime.procedure.HttpProcedureResponder.sendJson(HttpProcedureResponder.java:101) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.TransactionResponder.sendJson(TransactionResponder.java:77) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.AbstractProcedureResponder.sendJson(AbstractProcedureResponder.java:35) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.AbstractProcedureResponder.sendJson(AbstractProcedureResponder.java:30) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.examples.bigchildren.BigChildrenApp$CorrelationProcedure.getCorrelations(BigChildrenApp.java:194) ~[CorrelationProcedure.204c322c-337d-4c05-ae43-5a3a932040d6/:na]
... 17 common frames omitted
Caused by: java.lang.NullPointerException: null
at co.cask.cdap.internal.app.runtime.procedure.HttpProcedureResponder.sendJson(HttpProcedureResponder.java:91) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
... 21 common frames omitted
2014-11-28 12:00:37,310 - ERROR [procedure-executor-3:c.c.c.i.a.r.p.ReflectionHandlerMethod@70] - Fail to close response on error.
java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
at co.cask.cdap.internal.app.runtime.procedure.ReflectionHandlerMethod.handle(ReflectionHandlerMethod.java:62) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.ProcedureHandlerMethod.handle(ProcedureHandlerMethod.java:137) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.ProcedureDispatcher.handleRequest(ProcedureDispatcher.java:154) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.ProcedureDispatcher.messageReceived(ProcedureDispatcher.java:104) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.handler.execution.ChannelUpstreamEventRunnable.doRun(ChannelUpstreamEventRunnable.java:43) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:67) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:314) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.io.IOException: java.lang.NullPointerException
at co.cask.cdap.internal.app.runtime.procedure.HttpProcedureResponder.sendJson(HttpProcedureResponder.java:101) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.TransactionResponder.sendJson(TransactionResponder.java:77) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.AbstractProcedureResponder.sendJson(AbstractProcedureResponder.java:35) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.internal.app.runtime.procedure.AbstractProcedureResponder.sendJson(AbstractProcedureResponder.java:30) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
at co.cask.cdap.examples.bigchildren.BigChildrenApp$CorrelationProcedure.getCorrelations(BigChildrenApp.java:194) ~[CorrelationProcedure.204c322c-337d-4c05-ae43-5a3a932040d6/:na]
... 17 common frames omitted
Caused by: java.lang.NullPointerException: null
at co.cask.cdap.internal.app.runtime.procedure.HttpProcedureResponder.sendJson(HttpProcedureResponder.java:91) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
... 21 common frames omitted
20
--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/26d2e095-c9de-4ce5-8c96-e29daecb4f61%40googlegroups.com.
The scan was a good tip. We will continue to work on it.As for the logs, yes it is only the logs from the Spark program that are missing. Everything else is shown in the console.
--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/674a7fd9-f5b4-41f4-96a7-289fcd8e9f99%40googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/CAOyYuzpMwkHWFJ9hMOKhQEbbP-_FzZoSwp9D%3D0mrEr_Zypyu7A%40mail.gmail.com.
2014-12-05 12:04:01,523 - INFO [Spark - BigChildrenProgram:c.c.c.e.b.BigChildrenProgram@191] - before tcorr2014-12-05 12:04:01,537 - DEBUG [Spark Context Cleaner:o.a.s.Logging$class@63] - Got cleaning task CleanBroadcast(22)2014-12-05 12:04:01,537 - DEBUG [Spark Context Cleaner:o.a.s.Logging$class@63] - Cleaning broadcast 222014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.u.ActorLogReceive$$anon$1@50] - [actor] received message RemoveBroadcast(22,true) from Actor[akka://sparkDriver/temp/$be]2014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-2:o.a.s.u.ActorLogReceive$$anon$1@50] - [actor] received message RemoveBroadcast(22,true) from Actor[akka://sparkDriver/temp/$ce]2014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.u.ActorLogReceive$$anon$1@56] - [actor] handled message (0.226218 ms) RemoveBroadcast(22,true) from Actor[akka://sparkDriver/temp/$be]2014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@63] - removing broadcast 222014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-2:o.a.s.u.ActorLogReceive$$anon$1@56] - [actor] handled message (0.036131 ms) RemoveBroadcast(22,true) from Actor[akka://sparkDriver/temp/$ce]2014-12-05 12:04:01,538 - INFO [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@59] - Removing broadcast 222014-12-05 12:04:01,539 - INFO [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@59] - Removing block broadcast_222014-12-05 12:04:01,539 - INFO [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@59] - Block broadcast_22 of size 2688 dropped from memory (free 550647652)2014-12-05 12:04:01,539 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.Logging$class@63] - Done removing broadcast 22, response is 12014-12-05 12:04:01,539 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.Logging$class@63] - Sent response: 1 to Actor[akka://sparkDriver/temp/$ce]2014-12-05 12:04:01,540 - INFO [Spark Context Cleaner:o.a.s.Logging$class@59] - Cleaned broadcast 222014-12-05 12:04:01,540 - DEBUG [Spark Context Cleaner:o.a.s.Logging$class@63] - Got cleaning task CleanShuffle(14)2014-12-05 12:04:01,540 - DEBUG [Spark Context Cleaner:o.a.s.Logging$class@63] - Cleaning shuffle 142014-12-05 12:04:01,540 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.u.ActorLogReceive$$anon$1@50] - [actor] received message RemoveShuffle(14) from Actor[akka://sparkDriver/temp/$de]2014-12-05 12:04:01,540 - DEBUG [sparkDriver-akka.actor.default-dispatcher-2:o.a.s.u.ActorLogReceive$$anon$1@50] - [actor] received message RemoveShuffle(14) from Actor[akka://sparkDriver/temp/$ee]2014-12-05 12:04:01,540 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.u.ActorLogReceive$$anon$1@56] - [actor] handled message (0.187261 ms) RemoveShuffle(14) from Actor[akka://sparkDriver/temp/$de]2014-12-05 12:04:01,540 - INFO [Spark Context Cleaner:o.a.s.Logging$class@59] - Cleaned shuffle 142014-12-05 12:04:01,541 - DEBUG [sparkDriver-akka.actor.default-dispatcher-2:o.a.s.u.ActorLogReceive$$anon$1@56] - [actor] handled message (0.03294 ms) RemoveShuffle(14) from Actor[akka://sparkDriver/temp/$ee]2014-12-05 12:04:01,541 - DEBUG [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@63] - removing shuffle 142014-12-05 12:04:01,541 - INFO [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@59] - Deleted all files for shuffle 142014-12-05 12:04:01,541 - DEBUG [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@63] - Done removing shuffle 14, response is true2014-12-05 12:04:01,542 - DEBUG [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@63] - Sent response: true to Actor[akka://sparkDriver/temp/$ee]2014-12-05 12:04:01,542 - WARN [Spark - BigChildrenProgram:c.c.c.i.a.r.s.SparkProgramWrapper@160] - Program class run method threw an exceptionjava.lang.reflect.InvocationTargetException: null at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71] at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71] at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.runUserProgram(SparkProgramWrapper.java:152) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.instantiateUserProgramClass(SparkProgramWrapper.java:116) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.main(SparkProgramWrapper.java:89) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71] at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71] at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) [org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) [org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) [org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0] at co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.run(SparkRuntimeService.java:169) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na] at co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService$1$1.run(SparkRuntimeService.java:243) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by: org.apache.commons.math3.exception.DimensionMismatchException: 0 != 64,472 at org.apache.commons.math3.stat.correlation.PearsonsCorrelation.correlation(PearsonsCorrelation.java:230) ~[org.apache.commons.commons-math3-3.1.1.jar:3.1.1] at co.cask.cdap.examples.bigchildren.BigChildrenProgram.run(BigChildrenProgram.scala:192) ~[BigChildrenProgram.cbf350dc-bfa4-4f34-aea0-6b53fc1c5470/:na] ... 18 common frames omitted2014-12-05 12:04:01,543 - ERROR [Spark - BigChildrenProgram:c.c.c.i.a.r.s.SparkRuntimeService@171] - Failed to submit Spark program Job=spark: BigChildrenProgram, accountId=developer, applicationId=BigChildren, program=BigChildrenProgram, runid=6a786683-997d-47d2-b29a-475c4f4e88d4java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na] at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.runUserProgram(SparkProgramWrapper.java:161) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.instantiateUserProgramClass(SparkProgramWrapper.java:116) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.main(SparkProgramWrapper.java:89) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71] at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71] at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0] at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0] at co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.run(SparkRuntimeService.java:169) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na] at co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService$1$1.run(SparkRuntimeService.java:243) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]Caused by: java.lang.reflect.InvocationTargetException: null at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71] at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71] at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.runUserProgram(SparkProgramWrapper.java:152) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na] ... 13 common frames omittedCaused by: org.apache.commons.math3.exception.DimensionMismatchException: 0 != 64,472 at org.apache.commons.math3.stat.correlation.PearsonsCorrelation.correlation(PearsonsCorrelation.java:230) ~[org.apache.commons.commons-math3-3.1.1.jar:3.1.1] at co.cask.cdap.examples.bigchildren.BigChildrenProgram.run(BigChildrenProgram.scala:192) ~[BigChildrenProgram.cbf350dc-bfa4-4f34-aea0-6b53fc1c5470/:na] ... 18 common frames omitted