This may not be the most efficient mechanism (open to suggestions), but it is faster than using DF's directly. Currently it takes ~0.01s on my local machine.
(Using connector 1.4, spark 1.4.1 (Scala), Cassandra 2.1.6)
Example code:
```java
CassandraConnector(sc.getConf).withSessionDo {
session =>
session.execute("CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute("CREATE TABLE IF NOT EXISTS test.dhcp( timestamp bigint, mac text, ip text, hostname text, PRIMARY KEY (ip, timestamp) ) WITH CLUSTERING ORDER BY (timestamp DESC)")
}
dhcpDF.write.mode("Overwrite").format("org.apache.spark.sql.cassandra").options(Map("table" -> "dhcp", "keyspace" -> "test")).save()
def tsIPtoMACcassandra (timestamp : Long, IP : String, keyspace: String, table : String) : String = {
val _array = sc.cassandraTable(keyspace, table).select("mac").where("ip = ?", IP).where("timestamp <= ?", timestamp ).limit(1).toArray
_array(0).getString("mac")
}
val result = tsIPtoMACcassandra(14382457950010L, "192.168.1.150", "test", "dhcp")
```
Cheers,
Jasper
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
```
val mac = CassandraConnector(sc.getConf).withSessionDo {session => session.execute("select mac from test.dhcp WHERE ip='192.168.1.243' and timestamp<14382454600000 limit 1;")}.one.getString("mac")
```
Works and seems a lot faster!!
I need to do this query gazillions of times for different IP's and times and read that _prepared queries_ is the way to go for speeding up such things. Is there any way of doing this with spark-cassandra-connector?
Cheers,
Jasper
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Please excuse my deep ignorance. I have tried to RTFM, but could not find a reference as to how session.prepare() could be used for arbitrary lookups - especially `across sessions`. I.e. that the prepared statement is used by a function doing the cassandra lookups that is called from arbitrary points within the code as a single query.
Can you give more more info, no doubt this is simple and I have been chasing my tail for far too long 8(
```
val cc =': val cc = CassandraConnector(sc.getConf)
val session = cc.openSession
val prepared = session.prepare(s"select mac from $keyspace.$table where ip=? and timestamp<=? limit 1")
val prepCount = session.prepare(s"select count(*) from $keyspace.$table WHERE ip=?")
def IPtsMAC ( IP : String, timestamp : Long) : String = {
val row = session.execute(prepared.bind( IP, timestamp: java.lang.Long)).one
if (row == null) {
val another = session.execute( prepCount.bind(IP) ).one.getLong("count")
if (another == 0 ) "s" + IP else "b" + IP
} else {
row.getString("mac")
}
}
val macs = IPtsRDD.map( CRow => IPtsMAC( CRow.getString("ip"), CRow.getLong("timestamp")) )
```
Is it even possible to make serializable? I assume it is the `prepared` bit as a similar query without being prepared is working fine.
If you find a solution I'm interested.
I had the same issue when I have to serialize sc.conf
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Russel if what I understand is right, we do not have to make a
mapPartitions with a prepared by partiton but just a simple map ?
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.