Scan and update the same Cassandra table using Spark

1,928 views
Skip to first unread message

Mahmoud Almokadem

unread,
Nov 4, 2016, 4:17:47 PM11/4/16
to spark-conn...@lists.datastax.com

Hi, 

I'm using Spark to scan a huge Cassandra table (about 250M+ rows). How can I use Spark to update a single field of this table with a value selected from another table?

I'm using Java and spark-cassandra-connector.


Thanks,
Mahmoud

Mahmoud Almokadem

unread,
Nov 6, 2016, 4:45:42 AM11/6/16
to spark-conn...@lists.datastax.com

Jim Hatcher

unread,
Nov 6, 2016, 9:11:38 AM11/6/16
to spark-conn...@lists.datastax.com

Mahmoud,


Can you provide some more details about what you're doing.  I'm having a hard time conceptualizing your code.


Are you searching through the huge table looking for a certain value and then when you find it, you're updating with another value?  Or are you joining two tables and updating values on one?


Generally speaking, to update a Cassandra table via Spark, you need to have an RDD (or data frame) containing the key fields of the table plus the field you want to update.  Once, you have that, you save that back to Cassandra.


Something like this:

val tbl = sc.cassandraTable("keyspace", "huge_table")


val tblfiltered = tbl.filter(item => item.getString("searchme") == "somevalue")


val tbltransformed = tblfiltered.map { item =>

  val key = item.getString("keyField")

  val needsUpdate = "newvalue"

  ( key, needsUpdate )

}

tbltransformed.saveToCassandra("keyspace", "huge_table", SomeColumns("keyfield", "needsUpdate") )


Does that help?


Jim




From: spark-conn...@lists.datastax.com <spark-conn...@lists.datastax.com> on behalf of Mahmoud Almokadem <prog.m...@gmail.com>
Sent: Sunday, November 6, 2016 2:45 AM
To: spark-conn...@lists.datastax.com
Subject: Scan and update the same Cassandra table using Spark
 
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Mahmoud Almokadem

unread,
Nov 6, 2016, 9:32:59 AM11/6/16
to spark-conn...@lists.datastax.com
Thanks Jim, 

What I want to do exactly: I want to denormalize the huge table from other table.
I’ve created my table with some normalized form and want to complete some fields from other table.

For example: 

I’ve a table called huge_table(id, name, country_id)
another table country(country_id, country_name)

I want to change the huge_table to be (id, name, country_id, country_name)

So, I want to scan the whole huge_table and update the value from country table.

Sorry, but I’m new to Spark and Scala. If the code on Java, it will be fine.

Thanks,
Mahmoud 

siddharth verma

unread,
Nov 6, 2016, 9:45:56 AM11/6/16
to spark-conn...@lists.datastax.com
Hi Mahmoud,
If it is 
1. a one time job
2. you could afford to consume the CPU on you cassandra nodes
3. You don't have spark setup or familiarity in it.
This is utility which would be equivalent of select *, moreover you abstractly deal in java only
I used it to scan 229 million rows( with 128 threads) in approximately 2 minutes.

For your use case, you could
load table country in memory 
scan huge_table and calculate the desired row, and write it using SSTableWriter
and use SSTableLoader to put the new data in the specific table in cassandra

Reiterating, if your use case is say periodic refresh of the table, you should use spark.

This approach could be used for one time job.

Regards
Siddharth Verma




Sent: Sunday, November 6, 2016 2:45 AM

Subject: Scan and update the same Cassandra table using Spark

Hi, 

I'm using Spark to scan a huge Cassandra table (about 250M+ rows). How can I use Spark to update a single field of this table with a value selected from another table?

I'm using Java and spark-cassandra-connector.


Thanks,
Mahmoud

-- 
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

-- 
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.



--
Siddharth Verma
(Visit https://github.com/siddv29/cfs for a high speed cassandra full table scan)

Jim Hatcher

unread,
Nov 6, 2016, 2:53:19 PM11/6/16
to spark-conn...@lists.datastax.com

Hi Mahmoud,


Here is a sample of what you could (based on your description below).


Assuming these tables.

cqlsh> create KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
cqlsh use test;
cqlsh:test> create table huge_table ( id int PRIMARY KEY, name varchar, country_id int );
cqlsh:test> create table country ( country_id int PRIMARY KEY, country_name varchar );
cqlsh:test> create table huge_table_to_be ( id int PRIMARY KEY, name varchar, country_id int, country_name varchar );

And, assuming I put these records into them:
cqlsh:test> insert into huge_table ( id, name, country_id ) VALUES ( 1, 'mahmoud', 1 );
cqlsh:test> insert into huge_table ( id, name, country_id ) VALUES ( 2, 'jim', 2 );
cqlsh:test> insert into country ( country_id, country_name ) values ( 1, 'United States' );
cqlsh:test> insert into country ( country_id, country_name ) values ( 2, 'Djibouti' );


//get a reference to the source table

scala> val ht = sc.cassandraTable("test", "huge_table")
ht: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[22] at RDD at CassandraRDD.scala:15

//verify that we can see the data
scala> ht.take(10).foreach(println)
CassandraRow{id: 1, country_id: 1, name: mahmoud}
CassandraRow{id: 2, country_id: 2, name: jim}

//you can't join huge_table to country directly because it's not keyed correctly, so transform this rdd into a keyvaluepair whose key is country_id
scala> val ht_transformed = ht.map { row => ( row.getInt("country_id"), row ) }
ht_transformed: org.apache.spark.rdd.RDD[(Int, com.datastax.spark.connector.CassandraRow)] = MapPartitionsRDD[24] at map at <console>:65

scala> ht_transformed.take(10).foreach(println)
(1,CassandraRow{id: 1, country_id: 1, name: mahmoud})
(2,CassandraRow{id: 2, country_id: 2, name: jim})

//we can join to the country table now because we have the right keys

scala> val htWithCountries = ht_transformed.joinWithCassandraTable("test", "country")
htWithCountries: com.datastax.spark.connector.rdd.CassandraJoinRDD[(Int, com.datastax.spark.connector.CassandraRow),com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[25] at RDD at CassandraRDD.scala:15

//we have a two-item tuple with the source table on the left and the join results on the right
//in the left side of the two-item tuple, we have another two-item tuple with the country_id on the left and the hugh_table row on the right
scala> htWithCountries.take(10).foreach(println)
((1,CassandraRow{id: 1, country_id: 1, name: mahmoud}),CassandraRow{country_id: 1, country_name: United States})
((2,CassandraRow{id: 2, country_id: 2, name: jim}),CassandraRow{country_id: 2, country_name: Djibouti})

//we have all the data we need now in one rdd, but it's arranged in a weird format, so put it into an rdd that has the format we need (i.e., the same structure as the C* table we want to write to)

//be aware that the joinWithCassandraTable() above is an "inner join" -- if you want a left join, there is support for that in some of the later versions of the connector

scala> var htWithCountriesTransformed = htWithCountries.map { row => ( row._1._1, row._1._2.getString("name"), row._1._2.getInt("country_id"), row._2.getString("country_name") ) }
htWithCountriesTransformed: org.apache.spark.rdd.RDD[(Int, String, Int, String)] = MapPartitionsRDD[28] at map at <console>:69

scala> htWithCountriesTransformed.take(10).foreach(println)
(1,mahmoud,1,United States)
(2,jim,2,Djibouti)


//now that it's in a structure that matches our C* destination table, we can write it out to Cassandra -- we need to specify the columns so the Spark Cassandra Connector can map the data into the right places

scala> htWithCountriesTransformed.saveToCassandra("test", "huge_table_to_be", SomeColumns("id", "name", "country_id", "country_name"))

//check to see what the records look like
scala> val ht_2b = sc.cassandraTable("test", "huge_table_to_be")
ht_2b: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[29] at RDD at CassandraRDD.scala:15

scala> ht_2b.take(10).foreach(println)
CassandraRow{id: 1, country_id: 1, country_name: United States, name: mahmoud}
CassandraRow{id: 2, country_id: 2, country_name: Djibouti, name: jim}


Jim



From: spark-conn...@lists.datastax.com <spark-conn...@lists.datastax.com> on behalf of siddharth verma <sidd.ver...@gmail.com>
Sent: Sunday, November 6, 2016 7:45 AM
To: spark-conn...@lists.datastax.com
Subject: Re: Scan and update the same Cassandra table using Spark
 
Hi Mahmoud,
If it is 
1. a one time job
2. you could afford to consume the CPU on you cassandra nodes
3. You don't have spark setup or familiarity in it.
Contribute to cfs development by creating an account on GitHub.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Ankit Gadhiya

unread,
Jun 2, 2019, 12:46:44 AM6/2/19
to DataStax Spark Connector for Apache Cassandra
> From: spark-conn...@lists.datastax.com
> <spark-conn...@lists.datastax.com> on behalf of Mahmoud Almokadem <prog....@gmail.com>
>
> Sent: Sunday, November 6, 2016 2:45 AM
>
> To: spark-conn...@lists.datastax.com
>
> Subject: Scan and update the same Cassandra table using Spark
>
>  
>
>
>
>
>
>
>
>
>
> Hi, 
>
>
> I'm using Spark to scan a huge Cassandra table (about 250M+ rows). How can I use Spark to update a single field of this table with a value selected from another table?
>
>
> I'm using Java and spark-cassandra-connector.
>
>
>
>
>
> Thanks,
>
> Mahmoud
>
>
>
>
>
>
>
>
> -- 
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
>
> -- 
>
> You received
> this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe
> from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to
>
> spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
>
>
>
> Siddharth Verma
>
> (Visit https://github.com/siddv29/cfs for a high speed cassandra full table scan)
>
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to
> spark-connector-...@lists.datastax.com.

Hi Jim,

I think I've similar problem and need your expertise -

I am looking for batch update of a particular column through Spark Scala - say CITY from 'JFK' to 'CHI'. Partition key is on ID column. Can you suggest how I can achieve this please.

-- Ankit
Reply all
Reply to author
Forward
0 new messages