Handling output data in CDAP

214 views
Skip to first unread message

Endre Elvestad

unread,
Nov 27, 2014, 1:29:47 PM11/27/14
to cdap...@googlegroups.com
Hi, 

We are a student group at ETH Zürich currently trying out your product as a part of a project in our Big Data class. 
For our project we are exploring if there is a correlation between changes in weather and child mortality in developing countries.

This is the first time we are exposed to Big Data techniques and having read up on the different solutions available we went with CDAP. 
So far we have cleaned the data, set it up as a flow and have a running process utilizing Spark. 

Our problem is that we can find no way of saving the output data. Having tired several different approaches seen in the examples we have now come to you for help. 


Our current code: 
/*
 * GPL v.3 
 * Main class, implements mean temperature calculation and cross-correlation
 */

package co.cask.cdap.examples.bigchildren

import breeze.linalg.DenseVector
import breeze.linalg.Vector
import breeze.stats.meanAndVariance
import breeze.stats.mean
import breeze.stats.variance

import co.cask.cdap.api.spark.ScalaSparkProgram
import co.cask.cdap.api.spark.SparkContext

import org.apache.spark.SparkContext._
//import org.apache.spark.mllib.stat.corr

import org.apache.spark.rdd._ //NewHadoopRDD

import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.math

import org.apache.commons.math3._
import org.apache.commons.math3.stat.correlation.PearsonsCorrelation

//import java.io.PrintWriter
//import java.io.FileWriter

/**
  * main class
  */
class BigChildrenProgram extends ScalaSparkProgram {
  //var status: PrintWriter = new PrintWriter(new FileWriter("/home/psteger/logger", true))
  private final val LOG: Logger = LoggerFactory.getLogger(classOf[BigChildrenProgram])

  final val interval = 7 // [days]
  def parseTemp(fahrenheit: Double): Double = {
    (fahrenheit + 459.67)*5/9
  }

  def parsePrec(inches: Double): Double = {
    return inches*25.4
  }

  // we split a weather CSV file line by single whitespaces here, and convert parts to SI
  def parsewVector(line: String): Tuple3[Double,Double,Double] = {
    var dv = DenseVector(line.split(" ", -1))
    val timestamp: Double = (dv(2)).toDouble
    val temperature: Double = parseTemp(   (dv(3)).toDouble )
    val precip: Double = parsePrec(  (dv(13)).substring(0, math.max(2, (dv(13)).length()-1)).toDouble)
    return (timestamp, temperature, precip)
  }

  // we split a CSV file line by single whitespaces here, no further conversion
  def parsemVector(line: String): Tuple2[Double,Double] = {
    val dv = DenseVector(line.split(" ", -1))
    val year: Double = (dv(0)).toDouble // year
    val mort: Double = (dv(4)).toDouble // mortality per 1000 children
    return (year, mort)
  }

  // find which interval a given date YYYY-MM-DD lies in
  // by counting all days since start of the year, and projecting into 52 bins
  def findInterval(dateID: Int, w: Double): Int = {
    val Y: Int = dateID/10000
    val M: Int = (dateID - Y*10000)/100
    println(M)
    val Day: Int = (dateID - Y*10000)%100
    //LOG.info(Y, M, Day)
    var MonthLength = Array(31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
    if ((Y%4 == 0) && (Y%100 != 0 || Y%400 == 0)) {
      MonthLength(1) += 1     // add Feb 29 if necessary
    }
    // add up all completed months
    var dayofyear: Int = 0
    for(i <- 1 until M) {
      println(i)
      dayofyear = dayofyear+MonthLength(i-1)
    }
    // add days since beginning of current month
    dayofyear  += Day
    // map into week number (each of which is 7 days long)
    val out: Int = ((dayofyear-1)/w).toInt
    return out
  }

  def weight(y: Int, yi: Int, sig: Double, w: Double): Double = {
    val out: Double = math.exp(-math.pow(y-yi,2)/(2*math.pow(sig,2)))/(math.sqrt(2*math.Pi)*sig) //*365.25/w
    return out
  }





  override def run(sc: SparkContext) {
    val args: Array[String] = sc.getRuntimeArguments("args")
    //LOG.info("Running with arguments {}", args)
    // country to run for, if at all specified
    val country = if (args.length > 0) args(0) else "MI"

    //LOG.info("Reading in weather data")
    // new API Hadoop RDD according to apache.org website, not directly instantiating
    // HadoopRDD
    val linesDataset: NewHadoopRDD[Array[Byte], String] =
      sc.readFromDataset("Weather", classOf[Array[Byte]], classOf[String])
    // split by whitespaces, convert to Vector[Double], convert to SI units
    //LOG.info("parsing vectors")
    val wdata = linesDataset.map(kv => parsewVector(kv._2)).cache()

    //LOG.info("Calculating Mean")
    // cycle through all datasets
    // get datestamp, and corresponding period (say, week, interval=7days) of interest
    // get temperature from col (0,1,2,)3, calc new mean
    // get precipitation from col 13, calc new mean

    // do this via a Map-Reduce scheme
    // first get key value as period to hash to,
    // then extract temperature and precipitation

    //status.println("reading and parsing data")
    // floor(p(2)/10000) gives entry year
    val tdat = wdata.map(p => (findInterval((p._1).toInt,interval), (math.floor(p._1/10000), p._2)))
      .filter( kv => ((kv._2._2) > 0.0 ))
    //status.println("pdat")
    val pdat = wdata.map(p => (findInterval((p._1).toInt,interval), (math.floor(p._1/10000), p._3)))
      .filter( kv => ((kv._2._2) >= 0.0 ))
    //status.println("filtered data")

    //LOG.info("tred")
    // use the values in tred twice, thus we store it separately
    val tred = tdat.map(kv => (kv._1,kv._2._2)).groupByKey()
    // TODO: use meanAndVariance, and then extract the important pieces
    //val tmav = tred.map(v => (v._1, meanAndVariance(v._2)))
    //LOG.info("tMean")
    val tMean = tred.map(kv => (kv._1, mean(kv._2)))
    //LOG.info("tSig")
    val tSig  = tred.map(kv => (kv._1, math.max(0.00001, math.sqrt(variance(kv._2)))))
    //LOG.info("pred")
    val pred = pdat.map(kv => (kv._1,  kv._2._2)).groupByKey()
    //LOG.info("pMean")
    val pMean = pred.map(kv => (kv._1, mean(kv._2)))
    //LOG.info("pSig")
    val pSig  = pred.map(kv => (kv._1, math.max(0.00001, math.sqrt(variance(kv._2)))))

    // run through all values again, calculate distance from mean
    // d = data(this year) - mean(all years)/standard_deviation(all years)
    // do the join for each time interval
    // and store a new key, value pair, where key is the year
    //LOG.info("tDiff")
    val tDiff = tdat.join(tMean).map(kv => (kv._1, math.abs(kv._2._1._2-kv._2._2)))
    //LOG.info("pDiff")
    val pDiff = pdat.join(pMean).map(kv => (kv._1, math.abs(kv._2._1._2-kv._2._2)))
    // has key #interval, values (key: #year, diff from mean)

    //LOG.info("tOut")
    val tOut  = tDiff.join(tSig).map(kv => (kv._2._1, kv._2._1/kv._2._2))
    //LOG.info("pOut")
    val pOut  = pDiff.join(pSig).map(kv => (kv._2._1, kv._2._1/kv._2._2))
    // has new key #year, values (diff from mean / sig)

    // determine outlierness of a given year, summing over all time intervals this time
    //LOG.info("tweather_out")
    val tweather_out = tOut.groupByKey().map(kv => (kv._1, kv._2.sum))
    //LOG.info("pweather_out")
    val pweather_out = pOut.groupByKey().map(kv => (kv._1, kv._2.sum))

    //LOG.info("toutsort")
    val toutsort = tweather_out.sortByKey(true).filter(kv => ((kv._1) >= 1990 )).map(kv => kv._2).collect()
    //LOG.info("poutsort")
    val poutsort = pweather_out.sortByKey(true).filter(kv => ((kv._1) >= 1990 )).map(kv => kv._2).collect()

    // read in mortality data
    //LOG.info("read in mortality")
    val mortality: NewHadoopRDD[Array[Byte], String] =
      sc.readFromDataset("ChildMortality", classOf[Array[Byte]], classOf[String])
    // split by whitespaces, convert to Vector[Double]
    //LOG.info("parse mdata")
    val mdata = linesDataset.map(kv => parsemVector(kv._2)).cache()
    //LOG.info("m_out")
    val m_out = mdata.map(p => ((p._1).toInt, p._2))
    //LOG.info("moutsort")
    val moutsort = m_out.sortByKey(true).map(kv => kv._2).collect()

    // determine direct correlation as in Pearson; time-shift assumed small, no ARIMA needed
    //LOG.info("tcorr")
    val tcorr = (new PearsonsCorrelation()).correlation(toutsort, moutsort)
    //LOG.info("pcorr")
    val pcorr = (new PearsonsCorrelation()).correlation(poutsort, moutsort)
    m_out.map(kv => (pcorr, tcorr))

    //LOG.info("Done!")
    //status.println("tcorr = "+ tcorr+ ", pcorr = "+ pcorr)
    //status.flush()

    // write to file, for output later
    //tweather_out.saveAsTextFile("/home/psteger/tweather.out.dat")
    //pweather_out.saveAsTextFile("/home/psteger/pweather.out.dat")
    m_out.saveAsTextFile("/home/psteger/mortality.out.dat")


    //val originalContext: org.apache.spark.SparkContext = sc.getOriginalSparkContext()
    //sc.writeToDataset(originalContext.parallelize(m_out), "correlations", classOf[Array[Byte]], classOf[String])

  }

}



Any tips would be greatly appreciated. Attached is also a small subset of the data we are using. 


childmortality.txt
stations_MI.txt
alldata.txt

Nitin Motgi

unread,
Nov 27, 2014, 2:16:59 PM11/27/14
to Endre Elvestad, cdap...@googlegroups.com
Hi Endre,

I saw that you are attempting to write to a dataset at the end - but the line is commented. Are you having any problem saving it ? Is there a exception u are seeing ? 

Thanks,
Nitin

###
Random auto-corrects and typos are my special gift to you. When I forward they are from others. 
--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/3de608fa-7b58-4e2a-a59c-31bbcbc44499%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
<childmortality.txt>
<stations_MI.txt>
<alldata.txt>

Endre Elvestad

unread,
Nov 28, 2014, 5:04:54 AM11/28/14
to cdap...@googlegroups.com, endre.e...@gmail.com
Thanks for your response.

We have now made some changes to the output code. 
Everything compiles fine at this point using cdap 2.5.0 and it's dependencies. 

We are experiencing three problems: 

1)  No data seems to be reaching the datastore "correlations" 


2) Running the "Correlation procedure" we get the error underneath. Possibly due to the fact it have no data in the store.   

{
 
"response": null,
 
"error": "internal server error",
 
"message": "Fail to get procedure."
}



3) No log messages shown when we run the main process. 

 

Changes from the first post in out program:

// determine direct correlation as in Pearson; time-shift assumed small, no ARIMA needed
    LOG.info("tcorr")
    val tcorr = (new PearsonsCorrelation()).correlation(toutsort, moutsort)
    LOG.info("pcorr")
    val pcorr = (new PearsonsCorrelation()).correlation(poutsort, moutsort)

    LOG.info("Done!")

    // write to DataSet, for output later
    val output_array = Array(pcorr, tcorr)
    var centers = new Array[(Array[Byte], String)](1)
    centers(0) = Tuple2(country.getBytes, output_array.mkString(","))
    val originalContext: org.apache.spark.SparkContext = sc.getOriginalSparkContext()

    var outputrdd = originalContext.makeRDD[(Array[Byte], String)](centers)

    sc.writeToDataset(outputrdd, "correlations", classOf[Array[Byte]], classOf[String])








Our App code: 
/*
 * Copyright © 2014 Cask Data, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package co.cask.cdap.examples.bigchildren;

import co.cask.cdap.api.annotation.Handle;
import co.cask.cdap.api.annotation.ProcessInput;
import co.cask.cdap.api.annotation.UseDataSet;
import co.cask.cdap.api.app.AbstractApplication;
import co.cask.cdap.api.data.stream.Stream;
import co.cask.cdap.api.dataset.lib.ObjectStore;
import co.cask.cdap.api.dataset.lib.ObjectStores;
import co.cask.cdap.api.flow.Flow;
import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.procedure.AbstractProcedure;
import co.cask.cdap.api.procedure.ProcedureRequest;
import co.cask.cdap.api.procedure.ProcedureResponder;
import co.cask.cdap.api.procedure.ProcedureResponse;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.internal.io.UnsupportedTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.UUID;

/**
 * Application to find correlation between child mortality and outlierness of weather data
 */
public class BigChildrenApp extends AbstractApplication {
  public static final Charset UTF8 = Charset.forName("UTF-8");

  @Override
  public void configure() {
    setName("BigChildren");
    setDescription("BigChildren app");
    addStream(new Stream("weatherStream"));
    addStream(new Stream("childmortalityStream"));
    addFlow(new WeatherFlow());
    addFlow(new ChildMortalityFlow());
    addSpark(new BigChildrenSpecification());
    addProcedure(new CorrelationProcedure());

    try {
      ObjectStores.createObjectStore(getConfigurer(), "Weather", String.class);
      ObjectStores.createObjectStore(getConfigurer(), "ChildMortality", String.class);
      ObjectStores.createObjectStore(getConfigurer(), "Correlations", String.class);
    } catch (UnsupportedTypeException e) {
      // This exception is thrown by ObjectStore if its parameter type cannot be
      // (de)serialized (for example, if it is an interface and not a class, then there is
      // no auto-magic way deserialize an object.) In this case that will not happen
      // because String is an actual class.
      throw new RuntimeException(e);
    }
  }

  /**
   * A Spark Program that uses BigData algorithms to find correlations.
   */
  public static class BigChildrenSpecification extends AbstractSpark {
    @Override
    public SparkSpecification configure() {
      return SparkSpecification.Builder.with()
        .setName("BigChildrenProgram")
        .setDescription("BigChildren Program")
        .setMainClassName(BigChildrenProgram.class.getName())
        .build();
    }
  }

  /**
   * This Flowlet reads events from a Stream and saves them to a datastore.
   */
  public static class WeatherReader extends AbstractFlowlet {

    private static final Logger LOG = LoggerFactory.getLogger(WeatherReader.class);

    @UseDataSet("Weather")
    private ObjectStore<String> pointsStore;

    @ProcessInput
    public void process(StreamEvent event) {
      String body = new String(event.getBody().array());
      LOG.trace("Weather info: {}", body);
      pointsStore.write(getIdAsByte(UUID.randomUUID()), body);
    }

    private static byte[] getIdAsByte(UUID uuid) {
      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
      bb.putLong(uuid.getMostSignificantBits());
      bb.putLong(uuid.getLeastSignificantBits());
      return bb.array();
    }
  }

  /**
   * This Flowlet reads events from a Stream and saves them to a datastore.
   */
  public static class ChildMortalityReader extends AbstractFlowlet {

    private static final Logger LOG = LoggerFactory.getLogger(WeatherReader.class);

    @UseDataSet("ChildMortality")
    private ObjectStore<String> pointsStore;

    @ProcessInput
    public void process(StreamEvent event) {
      String body = new String(event.getBody().array());
      LOG.trace("Weather info: {}", body);
      pointsStore.write(getIdAsByte(UUID.randomUUID()), body);
    }

    private static byte[] getIdAsByte(UUID uuid) {
      ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
      bb.putLong(uuid.getMostSignificantBits());
      bb.putLong(uuid.getLeastSignificantBits());
      return bb.array();
    }
  }

  /**
   * Flow: consumes weather data from a Stream, stores them in a dataset
   */
  public static class WeatherFlow implements Flow {
    @Override
    public FlowSpecification configure() {
      return FlowSpecification.Builder.with()
        .setName("WeatherFlow")
        .setDescription("Reads weather information for a given weather station and stores it in dataset")
        .withFlowlets()
        .add("reader", new WeatherReader())
        .connect()
        .fromStream("weatherStream").to("reader")
        .build();
    }
  }

  /**
   * Flow: consumes childmortality data from a Stream, stores them in a dataset
   */
  public static class ChildMortalityFlow implements Flow {
    @Override
    public FlowSpecification configure() {
      return FlowSpecification.Builder.with()
        .setName("ChildMortalityFlow")
        .setDescription("Reads child mortality information for a given weather station and stores it in dataset")
        .withFlowlets()
        .add("reader", new ChildMortalityReader())
        .connect()
        .fromStream("childmortalityStream").to("reader")
        .build();
    }
  }

  /**
   * returns calculated correlation based on index parameter.
   */
  public static class CorrelationProcedure extends AbstractProcedure {

    private static final Logger LOG = LoggerFactory.getLogger(CorrelationProcedure.class);

    // Annotation indicates that correlations dataset is used in the procedure.
    @UseDataSet("correlation")
    private ObjectStore<String> correlations;

    @Handle("correlation")
    public void getCorrelations(ProcedureRequest request, ProcedureResponder responder)
      throws IOException, InterruptedException {
      String index = request.getArgument("index");
      if (index == null) {
        responder.error(ProcedureResponse.Code.CLIENT_ERROR, "Index must be given as argument");
        return;
      }
      LOG.debug("get correlation for index {}", index);
      // Send response with JSON format.
      responder.sendJson(correlations.read(index.getBytes()));
    }
  }
}







Ali Anwar

unread,
Nov 28, 2014, 5:47:05 AM11/28/14
to Endre Elvestad, cdap...@googlegroups.com
Hi Endre Elvestad.

In CorrelationProcedure, there is the following annotation:
@UseDataSet("correlation")
private ObjectStore<String> correlations;
This seems to be a mismatch: the procedure is looking for a dataset named "correlation", whereas the actual Dataset name defined by the BigChildrenApp is "Correlations".
Note that the variable can be named anything - it is only important that the annotation value matches the Dataset name.
I hope that fixes some of your problems.

Also, can you explain a little more what you mean by "no log messages show up"? Are you looking for them in the web interface or in a log file?

Looking forward to your response,

Ali Anwar

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.

Endre Elvestad

unread,
Nov 28, 2014, 6:26:09 AM11/28/14
to cdap...@googlegroups.com, endre.e...@gmail.com
Thanks Ali. That was indeed the issue!
All our names are now changes to "Correlations", capital C. :)

We are now one step further and can see that the data is flowing in to the Correlations dataset. 
The error message we get is an exception calling the handler 

Our call: 
Method: Correlations
Parameters:
{"Index" : "1"}
{"Index" : "0"}
{"Index" : "MI"}  - MI is the country code for Mali 


In both cases we get the response: 
Error log beneath.  
{
 
"response": null,
 
"error": "Service Unavailable",
 
"message": "Service Unavailable"
}


Regarding the logs:
The log.info messages does not seem to show up in the CDAP console log.


We are sorry for coming to such a high level community with such novice problems.
At the moment we don't know of anywhere else and we have spent weeks on getting this right. 


2014-11-28 12:00:37,308 -
ERROR
[procedure-executor-3:c.c.c.i.
a.r.p.ReflectionHandlerMethod@64] - Exception in calling procedure handler: public void co.cask.cdap.examples.bigchildren.BigChildrenApp$CorrelationProcedure.getCorrelations(co.cask.cdap.api.procedure.ProcedureRequest,co.cask.cdap.api.procedure.ProcedureResponder) throws java.io.IOException,java.lang.InterruptedException
java.lang.reflect.InvocationTargetException: null
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
        at co.cask.cdap.internal.app.runtime.procedure.ReflectionHandlerMethod.handle(ReflectionHandlerMethod.java:62) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.ProcedureHandlerMethod.handle(ProcedureHandlerMethod.java:137) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.ProcedureDispatcher.handleRequest(ProcedureDispatcher.java:154) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.ProcedureDispatcher.messageReceived(ProcedureDispatcher.java:104) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.handler.execution.ChannelUpstreamEventRunnable.doRun(ChannelUpstreamEventRunnable.java:43) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:67) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:314) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.io.IOException: java.lang.NullPointerException
        at co.cask.cdap.internal.app.runtime.procedure.HttpProcedureResponder.sendJson(HttpProcedureResponder.java:101) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.TransactionResponder.sendJson(TransactionResponder.java:77) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.AbstractProcedureResponder.sendJson(AbstractProcedureResponder.java:35) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.AbstractProcedureResponder.sendJson(AbstractProcedureResponder.java:30) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.examples.bigchildren.BigChildrenApp$CorrelationProcedure.getCorrelations(BigChildrenApp.java:194) ~[CorrelationProcedure.204c322c-337d-4c05-ae43-5a3a932040d6/:na]
        ... 17 common frames omitted
Caused by: java.lang.NullPointerException: null
        at co.cask.cdap.internal.app.runtime.procedure.HttpProcedureResponder.sendJson(HttpProcedureResponder.java:91) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        ... 21 common frames omitted
2014-11-28 12:00:37,310 - ERROR [procedure-executor-3:c.c.c.i.a.r.p.ReflectionHandlerMethod@70] - Fail to close response on error.
java.lang.reflect.InvocationTargetException: null
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
        at co.cask.cdap.internal.app.runtime.procedure.ReflectionHandlerMethod.handle(ReflectionHandlerMethod.java:62) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.ProcedureHandlerMethod.handle(ProcedureHandlerMethod.java:137) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.ProcedureDispatcher.handleRequest(ProcedureDispatcher.java:154) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.ProcedureDispatcher.messageReceived(ProcedureDispatcher.java:104) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.handler.execution.ChannelUpstreamEventRunnable.doRun(ChannelUpstreamEventRunnable.java:43) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.handler.execution.ChannelEventRunnable.run(ChannelEventRunnable.java:67) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor$ChildExecutor.run(OrderedMemoryAwareThreadPoolExecutor.java:314) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.io.IOException: java.lang.NullPointerException
        at co.cask.cdap.internal.app.runtime.procedure.HttpProcedureResponder.sendJson(HttpProcedureResponder.java:101) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.TransactionResponder.sendJson(TransactionResponder.java:77) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.AbstractProcedureResponder.sendJson(AbstractProcedureResponder.java:35) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.procedure.AbstractProcedureResponder.sendJson(AbstractProcedureResponder.java:30) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.examples.bigchildren.BigChildrenApp$CorrelationProcedure.getCorrelations(BigChildrenApp.java:194) ~[CorrelationProcedure.204c322c-337d-4c05-ae43-5a3a932040d6/:na]
        ... 17 common frames omitted
Caused by: java.lang.NullPointerException: null
        at co.cask.cdap.internal.app.runtime.procedure.HttpProcedureResponder.sendJson(HttpProcedureResponder.java:91) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        ... 21 common frames omitted
20


Ali Anwar

unread,
Nov 28, 2014, 7:01:03 AM11/28/14
to Endre Elvestad, cdap...@googlegroups.com
Hi once again.

Thanks for attaching your logs.
What's happening is that the ObjectStore.read(key) is returning null (probably the given key isn't found in the Dataset), and so a responder.sendJson(null) throws a NullPointerException at the following line:
responder.sendJson(correlations.read(index.getBytes()));

Instead, do something like:
result = correlations.read(index.getBytes()
if (result == null) {
    responder.sendJson(ProcedureResponse.Code.NOT_FOUND, "No result");
} else {
    responder.sendJson(ProcedureResponse.Code.SUCCESS, result);
}

To debug this (figure out why your key wasn't found), you could write a procedure endpoint to scan over the entire dataset and return the keys in the Dataset.
http://docs.cdap.io/cdap/2.5.2/en/reference-manual/javadocs/co/cask/cdap/api/dataset/lib/ObjectStore.html#scan(byte[], byte[])

Regarding the Log.info not showing up in CDAP console log: Is this the case only for the Spark program?

Good luck building your application!

Ali Anwar


--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.

Endre Elvestad

unread,
Nov 28, 2014, 9:13:34 AM11/28/14
to cdap...@googlegroups.com, endre.e...@gmail.com

The scan was a good tip. We will continue to work on it. 

As for the logs, yes it is only the logs from the Spark program that are missing. Everything else is shown in the console. 

Nitin Motgi

unread,
Nov 28, 2014, 11:47:38 AM11/28/14
to Endre Elvestad, cdap...@googlegroups.com
Are you seeing any logs for spark  in <cdap-install-directory>/logs/cdap.log ?

Don't worry about you feeling like you are asking novice questions. Every question helps us improve CDAP. So, please keep them coming.  

Thanks,
Nitin


On Fri, Nov 28, 2014 at 6:13 AM, Endre Elvestad <endre.e...@gmail.com> wrote:

The scan was a good tip. We will continue to work on it. 

As for the logs, yes it is only the logs from the Spark program that are missing. Everything else is shown in the console. 

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.

Rohit Sinha

unread,
Nov 28, 2014, 12:08:58 PM11/28/14
to endre.e...@gmail.com, cdap...@googlegroups.com, Nitin Motgi
Hello Endre,
Where exactly are you looking for spark logs ? 
Also, can you see them in cdap.log file as Nitin mentioned ? 


Sent from my iPhone

Endre Elvestad

unread,
Dec 5, 2014, 7:45:34 AM12/5/14
to cdap...@googlegroups.com, endre.e...@gmail.com, ni...@cask.co
Hi again
 
 
So we have continued working on your suggestions. 
Our problem as of now is:  It seems like one of our datastores are empty and it returns: "output no correlation found"

Do you agree with our interpretation of the error log? 
How would you go about find out which RDD is empty? 



2014-12-05 12:04:01,523 - INFO  [Spark - BigChildrenProgram:c.c.c.e.b.BigChildrenProgram@191] - before tcorr
2014-12-05 12:04:01,537 - DEBUG [Spark Context Cleaner:o.a.s.Logging$class@63] - Got cleaning task CleanBroadcast(22)
2014-12-05 12:04:01,537 - DEBUG [Spark Context Cleaner:o.a.s.Logging$class@63] - Cleaning broadcast 22
2014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.u.ActorLogReceive$$anon$1@50] - [actor] received message RemoveBroadcast(22,true) from Actor[akka://sparkDriver/temp/$be]
2014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-2:o.a.s.u.ActorLogReceive$$anon$1@50] - [actor] received message RemoveBroadcast(22,true) from Actor[akka://sparkDriver/temp/$ce]
2014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.u.ActorLogReceive$$anon$1@56] - [actor] handled message (0.226218 ms) RemoveBroadcast(22,true) from Actor[akka://sparkDriver/temp/$be]
2014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@63] - removing broadcast 22
2014-12-05 12:04:01,538 - DEBUG [sparkDriver-akka.actor.default-dispatcher-2:o.a.s.u.ActorLogReceive$$anon$1@56] - [actor] handled message (0.036131 ms) RemoveBroadcast(22,true) from Actor[akka://sparkDriver/temp/$ce]
2014-12-05 12:04:01,538 - INFO  [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@59] - Removing broadcast 22
2014-12-05 12:04:01,539 - INFO  [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@59] - Removing block broadcast_22
2014-12-05 12:04:01,539 - INFO  [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@59] - Block broadcast_22 of size 2688 dropped from memory (free 550647652)
2014-12-05 12:04:01,539 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.Logging$class@63] - Done removing broadcast 22, response is 1
2014-12-05 12:04:01,539 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.Logging$class@63] - Sent response: 1 to Actor[akka://sparkDriver/temp/$ce]
2014-12-05 12:04:01,540 - INFO  [Spark Context Cleaner:o.a.s.Logging$class@59] - Cleaned broadcast 22
2014-12-05 12:04:01,540 - DEBUG [Spark Context Cleaner:o.a.s.Logging$class@63] - Got cleaning task CleanShuffle(14)
2014-12-05 12:04:01,540 - DEBUG [Spark Context Cleaner:o.a.s.Logging$class@63] - Cleaning shuffle 14
2014-12-05 12:04:01,540 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.u.ActorLogReceive$$anon$1@50] - [actor] received message RemoveShuffle(14) from Actor[akka://sparkDriver/temp/$de]
2014-12-05 12:04:01,540 - DEBUG [sparkDriver-akka.actor.default-dispatcher-2:o.a.s.u.ActorLogReceive$$anon$1@50] - [actor] received message RemoveShuffle(14) from Actor[akka://sparkDriver/temp/$ee]
2014-12-05 12:04:01,540 - DEBUG [sparkDriver-akka.actor.default-dispatcher-3:o.a.s.u.ActorLogReceive$$anon$1@56] - [actor] handled message (0.187261 ms) RemoveShuffle(14) from Actor[akka://sparkDriver/temp/$de]
2014-12-05 12:04:01,540 - INFO  [Spark Context Cleaner:o.a.s.Logging$class@59] - Cleaned shuffle 14
2014-12-05 12:04:01,541 - DEBUG [sparkDriver-akka.actor.default-dispatcher-2:o.a.s.u.ActorLogReceive$$anon$1@56] - [actor] handled message (0.03294 ms) RemoveShuffle(14) from Actor[akka://sparkDriver/temp/$ee]
2014-12-05 12:04:01,541 - DEBUG [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@63] - removing shuffle 14
2014-12-05 12:04:01,541 - INFO  [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@59] - Deleted all files for shuffle 14
2014-12-05 12:04:01,541 - DEBUG [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@63] - Done removing shuffle 14, response is true
2014-12-05 12:04:01,542 - DEBUG [sparkDriver-akka.actor.default-dispatcher-13:o.a.s.Logging$class@63] - Sent response: true to Actor[akka://sparkDriver/temp/$ee]
2014-12-05 12:04:01,542 - WARN  [Spark - BigChildrenProgram:c.c.c.i.a.r.s.SparkProgramWrapper@160] - Program class run method threw an exception
java.lang.reflect.InvocationTargetException: null
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
        at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.runUserProgram(SparkProgramWrapper.java:152) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.instantiateUserProgramClass(SparkProgramWrapper.java:116) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.main(SparkProgramWrapper.java:89) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) [org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) [org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) [org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]
        at co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.run(SparkRuntimeService.java:169) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService$1$1.run(SparkRuntimeService.java:243) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: org.apache.commons.math3.exception.DimensionMismatchException: 0 != 64,472
        at org.apache.commons.math3.stat.correlation.PearsonsCorrelation.correlation(PearsonsCorrelation.java:230) ~[org.apache.commons.commons-math3-3.1.1.jar:3.1.1]
        at co.cask.cdap.examples.bigchildren.BigChildrenProgram.run(BigChildrenProgram.scala:192) ~[BigChildrenProgram.cbf350dc-bfa4-4f34-aea0-6b53fc1c5470/:na]
        ... 18 common frames omitted
2014-12-05 12:04:01,543 - ERROR [Spark - BigChildrenProgram:c.c.c.i.a.r.s.SparkRuntimeService@171] - Failed to submit Spark program Job=spark: BigChildrenProgram, accountId=developer, applicationId=BigChildren, program=BigChildrenProgram, runid=6a786683-997d-47d2-b29a-475c4f4e88d4
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
        at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.runUserProgram(SparkProgramWrapper.java:161) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.instantiateUserProgramClass(SparkProgramWrapper.java:116) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.main(SparkProgramWrapper.java:89) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]
        at co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.run(SparkRuntimeService.java:169) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) [co.cask.cdap.cdap-explore-jdbc-2.5.0.jar:na]
        at co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService$1$1.run(SparkRuntimeService.java:243) [co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.lang.reflect.InvocationTargetException: null
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
        at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
        at co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.runUserProgram(SparkProgramWrapper.java:152) ~[co.cask.cdap.cdap-app-fabric-2.5.0.jar:na]
        ... 13 common frames omitted
Caused by: org.apache.commons.math3.exception.DimensionMismatchException: 0 != 64,472
        at org.apache.commons.math3.stat.correlation.PearsonsCorrelation.correlation(PearsonsCorrelation.java:230) ~[org.apache.commons.commons-math3-3.1.1.jar:3.1.1]
        at co.cask.cdap.examples.bigchildren.BigChildrenProgram.run(BigChildrenProgram.scala:192) ~[BigChildrenProgram.cbf350dc-bfa4-4f34-aea0-6b53fc1c5470/:na]
        ... 18 common frames omitted



 
  
Regarding the logs: 
We are creating the logs as you can see from the code in earlier posts. We were looking for the spark log messages to appear together with  the others in the console, but Mr. Motgi was right. 
The logs did not appear in the console, but got written to <cdap-install-directory>/logs/cdap.log
 
Thanks for the tip, it have helped us a lot. 
 
 
 
 
 
 
 
 
Reply all
Reply to author
Forward
0 new messages