spark-cassandra-connector interpreter

1,255 views
Skip to first unread message

Todd Nist

unread,
Jan 23, 2015, 3:03:09 PM1/23/15
to zeppelin-...@googlegroups.com
I have a question on the creation of an interpreter for the spark-cassandra-connector form Datastax.  I have include the spark-cassandra-connetor and it's dependent jars into the $ZEPPELIN_HOME/interpreter/spark directory for a quick test.  I then created the following notebook (I know this is not correct, but just wanted to see if it would work for a point of reference):

  sc.stop // need get rid of this
 
  val conf
= new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
  val sc
= new SparkContext("spark://127.0.0.1:7077", "Cassandra Connector", conf)
  val cc
= new CassandraSQLContext(sc)

  cc
.setKeyspace("test")
  val rdd
= cc.cassandraSql("SELECT key, value FROM sql_demo")
 
  rdd
.collect().foreach(println)
 
  rdd
.registerTempTable("tmp")


which results in this:

conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@3c3f21eb
sc
: org.apache.spark.SparkContext = org.apache.spark.SparkContext@31000efc
cc
: org.apache.spark.sql.cassandra.CassandraSQLContext = org.apache.spark.sql.cassandra.CassandraSQLContext@48f80b01 rdd:
org
.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:103
 
== Query Plan ==
 
== Physical Plan ==
CassandraTableScan [key#30,value#31], (CassandraRelation TableDef(test,sql_demo,ArrayBuffer(ColumnDef(test,sql_demo,key,PartitionKeyColumn,IntType,false)),ArrayBuffer(),ArrayBuffer(ColumnDef(test,sql_demo,value,RegularColumn,DecimalType,false))), None), []
[4,4.25]
[2,2.55]
[5,2.82]
[1,1.0]
[3,3.66]

Then when I issue the following it results in the below.

%sql select key, value from tmp order by value
Table Not Found: tmp

So a couple of questions. 

  1. In my spark-defaults.conf, I am setting the this property:
  2. spark.cassandra.connection.host localhost


    However, it does not appear to be making it into the sparkContext by default, thus the reason I issued the sc.stop.  Will this only make it in if I create my own interpreter?  I would have thought any of the spark.* properties set in the spark-default.conf would be available, perhaps I need to place this value somewhere else; any guidance / thoughts?

  3. By issuing the sc.stop am I losing some context which Zeppelin is using / referencing for the temp table I created?  It's not clear to me why I get the "Table Not Found" error if that is not the case.

I would like to go ahead and create the spark-cassandra-connector interpreter, is there any guidance on this?  Seems like there would be a fair amount of overlap with the existing interpreter so before I get started thought I would see if there were some ideas on how to be approach this in your project.

TIA for the assistance.

- Todd



Todd Nist

unread,
Jan 29, 2015, 3:50:54 PM1/29/15
to zeppelin-...@googlegroups.com
FWIW, got it working and did not require a new interpreter.  I noticed the Bind/Unbind interpreters to note that was just recently committed to master.  Reading through that it was fairly clear on how to do it.  It could have been done with out this but by reading through this it became clear the the only thing I really needed to do was leverage the JAVA_OPTS in the conf/zeppelin-env.sh file.

Here is what I did, may be a better way but this works (open to suggestions on better approach).

1.  Modified the $ZEPPELIN_HOME/conf/zeppelin-env.sh and set the JAVA_OPTS to include the additional jars required for the DataStax spark-cassandra-connector:

export ZEPPELIN_JAVA_OPTS="-Dspark.jars=./spark-datastax-connector-lib/cassandra-clientutil-2.1.2.jar,./spark-datastax-connector-lib/cassandra-driver-core-2.1.3.jar,./spark-datastax-connector-lib/cassandra-thrift-2.1.2.jar,./spark-datastax-connector-lib/joda-convert-1.7.jar,./spark-datastax-connector-lib/joda-time-2.4.jar,./spark-datastax-connector-lib/spark-cassandra-connector_2.10-1.1.1.jar"

2.  Opted to create a new parser and bind it to the note book with the properties I needed for connecting to Cassandra.  One could also add these to the JAVA_OPTS, i.e.
 -Dspark.cassandra.connection.host=localhost.  New interpreter, only real difference for now was addition of above parameter:

  

3.  Create a very simple notebook for the following column family in Cassandra:

CREATE TABLE sql_demo (
  key
int,
  value
decimal,
  PRIMARY KEY
((key))
)

Note book to query the above Table / Column Family:

import org.apache.spark.sql.cassandra.CassandraSQLContext


case class Demo(key:Int, value:Double)


val cc
= new CassandraSQLContext(sc)

cc
.setKeyspace("test")
val rdd
= cc.cassandraSql("SELECT key, value FROM sql_demo")

rdd
.
collect

val rddRows
= rdd.map(r =>
 
Demo(r.getInt(0), r.getDouble(1))
)
     
rddRows
.registerTempTable("demo")

5. Finally query the TempTable "Demo":











Works great.  Still need to do some additional testing but so far so good.  Hopefully this may help some one else out in the future.

- Todd

Sejun Ra

unread,
Feb 3, 2015, 4:32:43 AM2/3/15
to zeppelin-...@googlegroups.com
That's awesome Todd. Thanks!
Reply all
Reply to author
Forward
0 new messages