val streams = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, storageLevel)
streams.foreachRDD( rdd => {
for(item <- rdd.collect().toArray) {
System.out.println(item);
}
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
kafkaStream.foreachRDD { rdd => | |
sqlContext.jsonRDD(rdd).registerTempTable("mytable") |
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Hi, Helena,Your example is very helpful to me. I will test this tomorrow.Do you know any API for retrieving column names from JsonRDD? I have hundreds of columns and seems not feasible to hardcode each in sql.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
I really wanted to do this sort of thing:
stream.map{ case (_,v) => JsonParser.parse(v).extract[MonthlyCommits]}
But I ran into this problem:
http://stackoverflow.com/questions/24786377/notserializableexception-with-json4s-on-spark
The DefaultFormats seem to be non-serializable in the json4s-3.2.10 that seems to be used in Spark (I get incompatibility when I compile with 3.3.0).
You would really help me with advise on this.
Best,
Kees Jan
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
I am running this following code I'm getting problem in converting Rdd into dataframe because Rdd taking schema of first row for converting it into dataframe.
import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import json
from kafka import KafkaConsumer, KafkaClient
conf = pyspark.SparkConf()
conf.setMaster('mesos://local_host:5050')
sc = pyspark.SparkContext(conf=conf)
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 2)
kafkaParams = {'metadata.broker.list': 'local_host:9092', 'auto.offset.reset': 'smallest'}
topics = ['new3']
kafka_stream = KafkaUtils.createDirectStream(ssc,topics,kafkaParams)
parsed = kafka_stream.map(lambda (k, v): json.loads(v))
parsed.foreachRDD(lambda rdd : rdd.toDF())
ssc.start()