Inner Joins on Cassandra RDDs

496 views
Skip to first unread message

Priya Ch

unread,
Oct 21, 2015, 8:07:30 AM10/21/15
to spark-conn...@lists.datastax.com, us...@spark.apache.org
Hello All,

   I have two Cassandra RDDs. I am using joinWithCassandraTable which is doing a cartesian join because of which we are getting unwanted rows. 


How to perform inner join on Cassandra RDDs ? If I intend to use normal join, i have to read entire table which is costly.

Is there any specific transformations available that enable inner joins ??

Regards,
Padma CH

Russell Spitzer

unread,
Oct 21, 2015, 8:56:46 AM10/21/15
to spark-conn...@lists.datastax.com, us...@spark.apache.org

JoinwithCassandra is an inner join (it only returns when both sides of the join have a match) and not a Cartesian join (row are joined with every row in the other table). What are you trying to do?


To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Priya Ch

unread,
Oct 21, 2015, 12:13:14 PM10/21/15
to spark-conn...@lists.datastax.com
Hi Russell,

  I have scenario like this. 

Table1 has row_timestamp, ticketNumber, ticket_issue_date and couponNumber as primary key.
Table 2 has  ticketNumber, ticket_issue_date and couponNumber as primary key

Now from table1, I am doing 

val rdd1 = sc.cassandraTable("keySpace", "table1").select(ticketNumber, ticket_issue_date, couponNumber).where(row_timestamp = 'value')

Now I am joining rdd1 as
val newRdd = rdd1.joinWithCassandraTable("keySpace", "table2").select(tkt_id, flightDate, flightNumber, boardingCity)

In table1 i have 3 rows also in table2 i have 3 rows. If inner join happens newRdd should return 3 rows but it is giving 9 rows.

How is this possible ?

Russell Spitzer

unread,
Oct 21, 2015, 12:23:12 PM10/21/15
to spark-conn...@lists.datastax.com
Maybe if you showed us the rows we could make a determination? List out all the rows in table2, table1 and the newRDD

Priya Ch

unread,
Oct 21, 2015, 12:50:37 PM10/21/15
to spark-conn...@lists.datastax.com
Table1 has the following columns -
year, month, day, hour, minutes, second, ticketNumber, ticket_date_of_issue, couponNumber.

The primary key contains all columns as  - 
((year, month, day, hour), minutes, second, ticketNumber, ticket_date_of_issue, couponNumber.)

Rows in Table1 are
 year, month, day, hour, minutes, seconds, ticketNumber, ticket_date_of_issue, couponNumber
2015, 10,       21,   15,    17,          28,          101,                2015-10-08 00:00:00, 1
2015, 10,       21,   15,    17,          28,          101,                2015-10-08 00:00:00, 2
2015, 10,       21,   15,    17,          28,          101,                2015-10-08 00:00:00, 3


Table2 has the following columns -
 ticketNumber, ticket_date_of_issue, couponNumber, tkt_id, flightDate, flightnumber, boardingCity

The primary key in Table2 is 
(ticketNumber, ticket_date_of_issue, couponNumber).

Rows in Table2 are
ticketNumber, ticket_date_of_issue, couponNumber, tkt_id ......
101,                2015-10-08 00:00:00, 1,                        11
101,                2015-10-08 00:00:00, 2,                        11
101,                2015-10-08 00:00:00, 3,                        11

Now the query looks as below -
val rdd1 = sc.cassandraTable("keySpace", "table1").select(ticketNumber, ticket_issue_date, couponNumber).where(year = '2015').where(month = '10').where(day = '21').where(hour = '15')

val newRdd = rdd1.joinWithCassandraTable("keySpace", "table2").select(tkt_id, flightDate, flightNumber, boardingCity)

The number of rows in newRdd is 9  i.e every row in rdd1 (ticketNumber, ticket_date_of_issue, couponNumber) is combined with
every row in table2. Ideally, row1 in rdd1 should join with row1 in table2 as there is only matching case.

But I see the combinations of row1 with row2 fields as well.

How could this happen ? is it because of the partition key specified in table1 ?




Russell Spitzer

unread,
Oct 21, 2015, 1:02:11 PM10/21/15
to spark-conn...@lists.datastax.com
The default for joining tables is the Partition Key, unless you specify "On" some other columns. The partition key for table2 is "ticketNumber".

RDD1 contains the following values for ticket number
101
101
101

Table 2 contains the following values for ticket number
101
101
101

The inner join where "ticketNumber == ticketNumber" is 
101, 101 x 9 

Quoting the docs
Joining on any column or columns in
the primary key is supported as long as it can be made into a valid CQL query. This means the entire partition key must
be specified and if any clustering key is specified all previous clustering keys must be supplied as well.

And the ScalaDoc
/**
* Uses the data from [[org.apache.spark.rdd.RDD RDD]] to join with a Cassandra table without
* retrieving the entire table.
* Any RDD which can be used to saveToCassandra can be used to joinWithCassandra as well as any
* RDD which only specifies the partition Key of a Cassandra Table. This method executes single
* partition requests against the Cassandra Table and accepts the functional modifiers that a
* normal [[com.datastax.spark.connector.rdd.CassandraTableScanRDD]] takes.
*
* By default this method only uses the Partition Key for joining but any combination of columns
* which are acceptable to C* can be used in the join. Specify columns using joinColumns as a parameter
* or the on() method.
*
* Example With Prior Repartitioning: {{{
* val source = sc.parallelize(keys).map(x => new KVRow(x))
* val repart = source.repartitionByCassandraReplica(keyspace, tableName, 10)
* val someCass = repart.joinWithCassandraTable(keyspace, tableName)
* }}}
*
* Example Joining on Clustering Columns: {{{
* val source = sc.parallelize(keys).map(x => (x, x * 100))
* val someCass = source.joinWithCassandraTable(keyspace, wideTable).on(SomeColumns("key", "group"))
* }}}
**/
 

Priya Ch

unread,
Oct 21, 2015, 1:11:58 PM10/21/15
to spark-conn...@lists.datastax.com
Thank you Russell :) So specifying  "On" would resolve the problem.

Priya Ch

unread,
Oct 21, 2015, 1:13:34 PM10/21/15
to spark-conn...@lists.datastax.com
One more question I have is whenever I say rdd.saveToCassandra, does this action bring the rdd to driver ?

Russell Spitzer

unread,
Oct 21, 2015, 1:29:07 PM10/21/15
to spark-conn...@lists.datastax.com
No. While saveToCassandra is an Action, it is performed in a distributed manner on the executors. 

Priya Ch

unread,
Oct 21, 2015, 1:31:17 PM10/21/15
to spark-conn...@lists.datastax.com
ok. Any rdd action like count, first would happen in distributed manner on the executors except collect ?

Russell Spitzer

unread,
Oct 21, 2015, 1:37:35 PM10/21/15
to spark-conn...@lists.datastax.com
Depends on the implementation of the particular action, but in general most actions take place distributed with a final reduce back to the driver if necessary. IE count per task, reduce, then reduce back to driver 

Priya Ch

unread,
Oct 21, 2015, 1:42:07 PM10/21/15
to spark-conn...@lists.datastax.com
Thank you Russell :) Its clear now .

Priya Ch

unread,
Oct 21, 2015, 1:46:43 PM10/21/15
to spark-conn...@lists.datastax.com
Russell, How does storm-cassandra connector work ? what is the best site/blog to get information on this ?

Russell Spitzer

unread,
Oct 21, 2015, 1:53:19 PM10/21/15
to spark-conn...@lists.datastax.com
I'm not involved with that project so I don't really know. Their Github hasn't been modified in over a year ...
Reply all
Reply to author
Forward
0 new messages