Connecting Spark with two Cassandra setups

751 views
Skip to first unread message

Ashay Tamhane

unread,
Aug 18, 2015, 3:57:29 AM8/18/15
to DataStax Spark Connector for Apache Cassandra
I have two different Cassandra setups on two different machines. I am trying to read data from one machine, process it using Spark and then write the result into the second setup. I am using spark-cassandra-connector-java_2.10. When I try to use javaFunctions.writeBuilder, it allows me to specify keyspace and table name, but the Cassandra host is fetched from the Spark Context. Is there a way to write data into a Cassandra setup other than one mentioned in Spark Context? How do we override this default setting?

varun sharma

unread,
Aug 21, 2015, 4:50:49 PM8/21/15
to spark-conn...@lists.datastax.com
+1, looking for similar thing... want a spark job to read from one cluster and write to another.

Can anyone help here?
Thanks


On Tue, Aug 18, 2015 at 1:27 PM, Ashay Tamhane <ashay....@myntra.com> wrote:
I have two different Cassandra setups on two different machines. I am trying to read data from one machine, process it using Spark and then write the result into the second setup. I am using spark-cassandra-connector-java_2.10. When I try to use javaFunctions.writeBuilder, it allows me to specify keyspace and table name, but the Cassandra host is fetched from the Spark Context. Is there a way to write data into a Cassandra setup other than one mentioned in Spark Context? How do we override this default setting?
--

------------------------------

IMPORTANT NOTICE: This e-mail, including any attachments, may contain
confidential information and is intended only for the addressee(s) named
above. If you are not the intended recipient(s), you should not
disseminate, distribute, or copy this e-mail. Please notify the sender by
reply e-mail immediately if you have received this e-mail in error and
permanently delete all copies of the original message from your system.
E-mail transmission cannot be guaranteed to be secure as it could be
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
contain viruses. Company accepts no liability for any damage caused by any
virus transmitted by this e-mail.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.



--
VARUN SHARMA
Flipkart
Bangalore

Russell Spitzer

unread,
Aug 21, 2015, 5:04:18 PM8/21/15
to spark-conn...@lists.datastax.com
In java you just have to pass a CassandraConnector with a connection to the other cluster in the save to Cassandra Function,
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector-java/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java

https://github.com/datastax/spark-cassandra-connector/blob/77da249fc5682823152f99cafde7f736210af0a0/spark-cassandra-connector-java/src/main/java/com/datastax/spark/connector/japi/RDDJavaFunctions.java#L51-L58

For Scala you can do something similar but with implicits
implicit val firstCluster = CassandraConnector(conf for first cluster) 
val rddFromFirstCluster = sc.cassandraTable(ks,table) // Will use the implicit cassandraconnector in this block 
 } 
{
implicit val secondCluster = CassandraConnector(conf for second cluster)
rddFromFirstCluster.saveToCassandra(ks,table) //will use the implicit cassandra connector in this block
--

Russell Spitzer

unread,
Aug 21, 2015, 5:11:30 PM8/21/15
to spark-conn...@lists.datastax.com
Ah excuse me there is a better way in Java, I forgot Jacek had added this nifty method to writer builder 
https://github.com/datastax/spark-cassandra-connector/blob/77da249fc5682823152f99cafde7f736210af0a0/spark-cassandra-connector-java/src/main/java/com/datastax/spark/connector/japi/RDDAndDStreamCommonJavaFunctions.java#L124-L129
You can just add a .withConnector and pass a different CassandraConnector object
--

varun sharma

unread,
Aug 22, 2015, 3:59:52 AM8/22/15
to spark-conn...@lists.datastax.com
Hi Russel,

Thanks for sharing the references.
Question here is if i use two blocks of code as told by you, how will the value of rddFromFirstCluster be available in second block?

This program worked for me:
def main(args: Array[String]): Unit = {
    val sparkConf1 = new SparkConf().setAppName("OldDC").set("spark.cassandra.connection.host", "DC1_ips")
    val sc = new SparkContext(sparkConf1)
    val rddFromFirstCluster = sc.cassandraTable(CASSANDRA_KEY_SPACE, CASSANDRA_TABLE)
    val sparkConf2 = new SparkConf().setAppName("NewDC").set("spark.cassandra.connection.host", "DC2_ips")
    implicit val secondCluster = CassandraConnector.apply(sparkConf2)
    rddFromFirstCluster.saveToCassandra(CASSANDRA_KEY_SPACE, CASSANDRA_TABLE_NEW) //will use the implicit cassandra connector in this block

  }

Thanks
Varun


Hafiz Mujadid

unread,
Aug 22, 2015, 5:53:48 AM8/22/15
to spark-conn...@lists.datastax.com
Hi all!

What If I need to join two tables from two different clusters in scala?

Thanks
Regards: HAFIZ MUJADID

Russell Spitzer

unread,
Aug 23, 2015, 1:34:50 AM8/23/15
to DataStax Spark Connector for Apache Cassandra
Here is a full code sample in case my first explanation was a little confusing. This reads from a cluster at 127.0.0.1 and writes to a cluster at 127.0.0.2


https://gist.github.com/RussellSpitzer/437f57dae4fd4bc4f32d

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

import org.apache.spark.SparkContext


def twoClusterExample ( sc: SparkContext) = {
val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))

val rddFromClusterOne = {
// Sets connectorToClusterOne as default connection for everything in this code block
implicit val c = connectorToClusterOne
sc.cassandraTable("ks","tab")
}

{
//Sets connectorToClusterTwo as the default connection for everything in this code block
implicit val c = connectorToClusterTwo
rddFromClusterOne.saveToCassandra("ks","tab")

Hafiz Mujadid

unread,
Aug 25, 2015, 5:01:53 AM8/25/15
to spark-conn...@lists.datastax.com
Hi russel thanks for the code. it's working fine. will it work with data frames?



Russell Spitzer

unread,
Aug 25, 2015, 11:32:53 AM8/25/15
to spark-conn...@lists.datastax.com
I belive that code sample will work with dataframes. You also have on other option with dataframes. Since they allow you to pass in any connector option you can do something like.

df.read.format.options( ... "spark.cassandra.connector.host" -> "ip").load.write.format.options( ... "spark.cassandra.connection.host" -> "otherip").save

Hafiz Mujadid

unread,
Aug 26, 2015, 10:19:57 AM8/26/15
to spark-conn...@lists.datastax.com
Hi Russel!

Following piece of code did not work.

val conf = new SparkConf(true)
      .set("spark.driver.allowMultipleContexts", "true")
val sc = new SparkContext("local", "test", conf)
val sqlContext = new SQLContext(sc)
    val df = sqlContext
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("spark.cassandra.connection.host" -> "192.168.23.102","table" -> "t1", "keyspace" -> "k1", "spark.cassandra.input.split.size" -> "500"))
      .load()
      df.show

it tries to connect to my local host 

Alex Liu

unread,
Aug 26, 2015, 11:22:00 AM8/26/15
to DataStax Spark Connector for Apache Cassandra

Russell Spitzer

unread,
Aug 26, 2015, 11:45:31 AM8/26/15
to DataStax Spark Connector for Apache Cassandra
Also don't "allowMultipleContexts" it is almost always a bad idea :(

Hurray Cassandra!

Hafiz Mujadid

unread,
Aug 26, 2015, 12:09:13 PM8/26/15
to spark-conn...@lists.datastax.com
Hi Alex!

this code is not working
val conf = new SparkConf(true)
val sc = new SparkContext("local", "test", conf)
val cc = new CassandraSQLContext(sc)
cc.setConf("spark.cassandra.connection.host", "192.168.23.102")
val df= cc.sql("select * from k1.t1 ")
df.show

Alex Liu

unread,
Aug 26, 2015, 12:29:26 PM8/26/15
to DataStax Spark Connector for Apache Cassandra

Hafiz Mujadid

unread,
Aug 26, 2015, 12:40:14 PM8/26/15
to spark-conn...@lists.datastax.com
Alex!

this code is not working 
val conf = new SparkConf(true)
    val sc = new SparkContext("local", "test", conf)
    val cc = new CassandraSQLContext(sc)
    cc.setConf("cluster1/spark.cassandra.connection.host", "192.168.23.102")
    cc.setConf("cluster1/spark.cassandra.connection.port", "9042")
    val rdd = cc.sql("select * from cluster1.k1.t1 ")
    rdd.show

what is the issue?
how can i change it to work?

thanks

Alex Liu

unread,
Aug 26, 2015, 12:42:29 PM8/26/15
to DataStax Spark Connector for Apache Cassandra
Can you show the error log?

Alex Liu

unread,
Aug 26, 2015, 12:47:09 PM8/26/15
to DataStax Spark Connector for Apache Cassandra

Hafiz Mujadid

unread,
Aug 26, 2015, 12:55:09 PM8/26/15
to spark-conn...@lists.datastax.com
Hi Alex!

I have following error

Exception in thread "main" com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: Missing cluster cluster1 Cassandra connection conf
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2263)
at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)


I think issue is with cluster
I am using spark-cassandra 1.4 and My cassandra has default cluster name as "Test Cluster"

Alex Liu

unread,
Aug 26, 2015, 1:04:23 PM8/26/15
to DataStax Spark Connector for Apache Cassandra
It looks like the cluster level setting is not in effect. Can you post a full log(turn the log level to INFO/Debug)? Another post said it work for him for a similar issue.

Alex
> ...

Hafiz Mujadid

unread,
Aug 26, 2015, 1:13:15 PM8/26/15
to spark-conn...@lists.datastax.com
this is the full stacktrace
Exception in thread "main" com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: Missing cluster cluster1 Cassandra connection conf
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2263)
at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at org.apache.spark.sql.cassandra.CassandraCatalog.lookupRelation(CassandraCatalog.scala:30)
at org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(CassandraSQLContext.scala:218)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:165)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:165)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:165)
at org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.lookupRelation(CassandraSQLContext.scala:218)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:222)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:233)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:229)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:242)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:227)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:212)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:229)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:922)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:922)
at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:920)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:211)
at org.apache.spark.sql.cassandra.CassandraSQLContext.sql(CassandraSQLContext.scala:214)
at com.platalytics.connectors.mysqltest.CassandraTest.fetch(CassandraTest.scala:18)
at com.platalytics.connectors.mysqltest.test$.main(test.scala:13)
at com.platalytics.connectors.mysqltest.test.main(test.scala)
Caused by: java.lang.RuntimeException: Missing cluster cluster1 Cassandra connection conf
at org.apache.spark.sql.cassandra.CassandraSQLContext$$anonfun$getCassandraConnConf$1.apply(CassandraSQLContext.scala:187)
at org.apache.spark.sql.cassandra.CassandraSQLContext$$anonfun$getCassandraConnConf$1.apply(CassandraSQLContext.scala:187)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.cassandra.CassandraSQLContext.getCassandraConnConf(CassandraSQLContext.scala:187)
at org.apache.spark.sql.cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:24)
at org.apache.spark.sql.cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:21)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
--
Regards: HAFIZ MUJADID

Alex Liu

unread,
Aug 26, 2015, 1:19:28 PM8/26/15
to DataStax Spark Connector for Apache Cassandra
From your log, you are using connector 1.2.x which is different from 1.3.x/1.4.x

please upgrade your connector
> > > >...

Hafiz Mujadid

unread,
Aug 26, 2015, 1:25:16 PM8/26/15
to spark-conn...@lists.datastax.com
I am using following maven dependencies
                <dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.4.0-M1</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>2.2.0-rc1</version>
</dependency>

> > > >...

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.



--
Regards: HAFIZ MUJADID

Alex Liu

unread,
Aug 26, 2015, 1:32:55 PM8/26/15
to DataStax Spark Connector for Apache Cassandra
Check the jars in your class path which tells the version used during runtime. (You may have multiple version jars in the class path) The error logs show the code is 1.2.x version which sets the cluster level setting differently.

Alex
> > ...
Reply all
Reply to author
Forward
0 new messages