Fastest way to get a single value from Cassandra table

791 views
Skip to first unread message

Jasper Mackenzie

unread,
Aug 24, 2015, 3:37:16 PM8/24/15
to DataStax Spark Connector for Apache Cassandra
Good day All,
I am using Cassandra to do a lookup and return the value from single `cell`. Is there a way to return this more directly than from a `CassandraTableScanRDD` to a `com.datastax.spark.connector.CassandraRow` to an Array then a String?

This may not be the most efficient mechanism (open to suggestions), but it is faster than using DF's directly. Currently it takes ~0.01s on my local machine.
(Using connector 1.4, spark 1.4.1 (Scala), Cassandra 2.1.6)

Example code:
```java
CassandraConnector(sc.getConf).withSessionDo {
session =>
session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE IF NOT EXISTS test.dhcp( timestamp bigint, mac text, ip text, hostname text, PRIMARY KEY (ip, timestamp) ) WITH CLUSTERING ORDER BY (timestamp DESC)")
}
dhcpDF.write.mode("Overwrite").format("org.apache.spark.sql.cassandra").options(Map("table" -> "dhcp", "keyspace" -> "test")).save()

def tsIPtoMACcassandra (timestamp : Long, IP : String, keyspace: String, table : String) : String = {
val _array = sc.cassandraTable(keyspace, table).select("mac").where("ip = ?", IP).where("timestamp <= ?", timestamp ).limit(1).toArray
_array(0).getString("mac")
}

val result = tsIPtoMACcassandra(14382457950010L, "192.168.1.150", "test", "dhcp")
```

Cheers,

Jasper

Russell Spitzer

unread,
Aug 24, 2015, 4:35:36 PM8/24/15
to DataStax Spark Connector for Apache Cassandra
The fastest way to retrieve a single row would be to avoid Spark altogether. Especially since from your code it looks like you are returning the value to the driver?

I would just do

val mac = CassandraConnector(sc.getConf).withSessionDo {
    session =>
        session.execute("SELECT mac FROM keyspace.table ... ").head.getString("mac") // Not actually tested but you get the idea :)
}

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Jasper Mackenzie

unread,
Aug 24, 2015, 5:45:32 PM8/24/15
to DataStax Spark Connector for Apache Cassandra
Thanks Russel,
`.head` is not availablem but `.one` is as:

```
val mac = CassandraConnector(sc.getConf).withSessionDo {session => session.execute("select mac from test.dhcp WHERE ip='192.168.1.243' and timestamp<14382454600000 limit 1;")}.one.getString("mac")
```
Works and seems a lot faster!!

I need to do this query gazillions of times for different IP's and times and read that _prepared queries_ is the way to go for speeding up such things. Is there any way of doing this with spark-cassandra-connector?

Cheers,

Jasper

Russell Spitzer

unread,
Aug 25, 2015, 11:29:41 AM8/25/15
to DataStax Spark Connector for Apache Cassandra
You can use prepared statements just like you would in a normal C* application. ps = session.prepare(). That would be best if you want to work with the information on the driver. If you want to work with it on the cluster joinWithCassandra lets you take an RDD of ips and retrieve them all from C*

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Jasper Mackenzie

unread,
Aug 26, 2015, 8:02:19 PM8/26/15
to DataStax Spark Connector for Apache Cassandra
> You can use prepared statements just like you would in a normal C* application. ps = session.prepare(). That would be best if you want to work with the information on the driver. If you want to work with it on the cluster joinWithCassandra lets you take an RDD of ips and retrieve them all from C*

Please excuse my deep ignorance. I have tried to RTFM, but could not find a reference as to how session.prepare() could be used for arbitrary lookups - especially `across sessions`. I.e. that the prepared statement is used by a function doing the cassandra lookups that is called from arbitrary points within the code as a single query.

Can you give more more info, no doubt this is simple and I have been chasing my tail for far too long 8(

Denis Makarskiy

unread,
Aug 27, 2015, 3:17:44 AM8/27/15
to spark-conn...@lists.datastax.com

Jasper Mackenzie

unread,
Aug 27, 2015, 9:31:33 PM8/27/15
to DataStax Spark Connector for Apache Cassandra
Thanks Denis,
That article put me in the right direction. But my solution does not work in spark as it is not serializable! 8( oh the wasted effort 8)
In case anyone else tries something similar this is what I did:

```
val cc =': val cc = CassandraConnector(sc.getConf)
val session = cc.openSession
val prepared = session.prepare(s"select mac from $keyspace.$table where ip=? and timestamp<=? limit 1")
val prepCount = session.prepare(s"select count(*) from $keyspace.$table WHERE ip=?")
def IPtsMAC ( IP : String, timestamp : Long) : String = {
val row = session.execute(prepared.bind( IP, timestamp: java.lang.Long)).one

if (row == null) {
val another = session.execute( prepCount.bind(IP) ).one.getLong("count")
if (another == 0 ) "s" + IP else "b" + IP
} else {
row.getString("mac")
}
}

val macs = IPtsRDD.map( CRow => IPtsMAC( CRow.getString("ip"), CRow.getLong("timestamp")) )
```

Is it even possible to make serializable? I assume it is the `prepared` bit as a similar query without being prepared is working fine.

Etienne Couritas

unread,
Aug 28, 2015, 11:59:00 AM8/28/15
to DataStax Spark Connector for Apache Cassandra
If you find a solution I'm interested.
I had the same issue when I have to serialize sc.conf

Russell Spitzer

unread,
Aug 28, 2015, 12:43:12 PM8/28/15
to DataStax Spark Connector for Apache Cassandra
The prepared statement can't be serializable since it belongs to a particular driver session which is not serializable. BUT the SSC has had the same issue internally so we have a prepared statement cache which can be accessed by User programs transparently. 

This means you don't have to prepare outside of the serialization, and you can call prepare repeatedly and as long as the string is the same the cache will be used as long as your connection.keep_alive_ms is long enough.


On Fri, Aug 28, 2015 at 8:59 AM Etienne Couritas <e.cou...@gmail.com> wrote:
If you find a solution I'm interested.
I had the same issue when I have to serialize sc.conf

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Russell Spitzer

unread,
Aug 28, 2015, 12:44:42 PM8/28/15
to DataStax Spark Connector for Apache Cassandra
@Etienne i'm not sure why you are having issues serializing the conf object, but the spark context itself is not serializable and can't be used inside distributed lambdas 
--

Jasper Mackenzie

unread,
Aug 30, 2015, 4:03:35 PM8/30/15
to spark-conn...@lists.datastax.com
Thank you Russel,
 I increased connection.keep_alive_ms to 2000ms, and using this function:

```
def cachedIPtsMAC (
    IP : String,
    timestamp : Long,
    cc : CassandraConnector,

    keyspace : String,
    table : String) : String =  {
   
    val session = cc.openSession
    val prepared = session.prepare(s"select mac from $keyspace.$table where ip=? and timestamp<=? limit 1")
    val prepCount = session.prepare(s"select count(*) from $keyspace.$table WHERE ip=?")
    val row = session.execute(prepared.bind( IP, timestamp: java.lang.Long)).one
   
    if (row == null) {
            val another = session.execute( prepCount.bind(IP) ).one.getLong("count")
        if (another == 0 ) "s" + IP else "b" + IP
    } else {
        row.getString("mac")
    }
}
```

achieved a 30% speed increase. Sweet!

Etienne Couritas

unread,
Aug 31, 2015, 8:16:19 AM8/31/15
to DataStax Spark Connector for Apache Cassandra
I have followed your previous exemple with the "CassandraConnector(sc.getConf).withSessionDo".

Russel if what I understand is right, we do not have to make a
mapPartitions with a prepared by partiton but just a simple map ?

Russell Spitzer

unread,
Aug 31, 2015, 12:30:47 PM8/31/15
to DataStax Spark Connector for Apache Cassandra
@Jasper That's great news! Please let us know if we can do anything to make that easier, or if you notice any bottlenecks.

@Etienne
I'm not sure I follow your question, but if you are asking if the cache will be used in a map as well as a mapPartitons, the answer is yes. You could repeatedly prepare the statement and still keep accessing the cache. You are still probably better off with a mapPartitions because then you avoid multiple cache lookups.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--
Reply all
Reply to author
Forward
0 new messages