Broadcasting CassandraConnector inside spark

203 views
Skip to first unread message

Bartłomiej Alberski

unread,
Jul 18, 2015, 1:32:55 PM7/18/15
to spark-conn...@lists.datastax.com
I desribed alreadry problem on stackoverflow so please check it out:
http://stackoverflow.com/questions/31490583/spark-broadcasting-cassandra-connector
In case of any doubts please let me know.

Russell Spitzer

unread,
Jul 18, 2015, 4:56:12 PM7/18/15
to spark-conn...@lists.datastax.com
I answered on SO as well but you basically should not be broadcasting the CassandraConnector. Just use it in parallelized code and it will automatically use a executor pool or make a new connection using a serialized configuration.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Bartłomiej Alberski

unread,
Jul 18, 2015, 5:30:04 PM7/18/15
to spark-conn...@lists.datastax.com
Ok, I read your answear and technicaly I agree with you. Let me explain why I was trying to do sth like this. I was trying to broadcast CassandraConnector (or CassandraConnectorConf) in order to have possibility to manually manage time in which cassandra session will be closed in streaming job. In fact in every batch I need to communicate with cassandra from each executor. Right now I do not see really good way to do sth like this without ensuring that my connection will be closed only on shutting down JVM.


Analysis of next part of code:
rdd.mapPartitions(x =>
...
cassandraConnector.withSessionDo{ session => // sessionBlock

}
...
)
suggest that session will be closed everytime sessionBlock ends (and spark.cassandra.connection.keep_alive_ms elapse). It is problematic especially for streaming jobs where mapPartitions function is executed (more or less) every batch_interval.

Maybe changing spark.cassandra.connection.keep_alive_ms to be greater than batch_interval could solve issue?
Do you have any advices for such case?

Russell Spitzer

unread,
Jul 18, 2015, 5:57:18 PM7/18/15
to spark-conn...@lists.datastax.com
Yes changing the connection keep alive is what you should do. The end of a withSessionDo will only close the connection if there are no more handles and keep_alive has passed. There is no way to "broadcast" the full connection since it isn't serializable.

Bartłomiej Alberski

unread,
Jul 19, 2015, 4:44:15 AM7/19/15
to spark-conn...@lists.datastax.com
Thanks for the answer.
I am aware that there is no way to "broadcast" the full connection and for me it is ok.

The problem is even when I broadcast only CassandraConnectorConf which is case class with configuration and try to create CassandraConnector on the executor side I am receiving the same error.

What is more if I print hosts from broadcassted CassandraConnectorConf on driver side everything is ok: correct IP address, but If I do the same on executor side I am receiving 0.0.0.0 as hosts IP Address

It is by design or some error?

Russell Spitzer

unread,
Jul 19, 2015, 4:51:16 AM7/19/15
to spark-conn...@lists.datastax.com

I'm still confused why you are trying to broadcast the conf, you can create connections on the executor side just by doing. RDD.mapPartitions( Cassandraconnector(someConf).withSessionDo())

This will serialize the conf and use a connection pool on the executor.

Could explain what additional functionality you are looking for?


To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Bartłomiej Alberski

unread,
Jul 19, 2015, 5:16:39 AM7/19/15
to spark-conn...@lists.datastax.com
Maybe what I am doing is some strange way for keeping conenction alive on executor side until streaming ends and probably there is better way, but I was planning to do sth like this.

First of all define holder for cassandra session that will be stored on each executor.

class CassandraSession(f: () => CassandraConnector){
@transient
lazy val cassandraConnector = f()

def withSessionDo[T](code: Session => T): T = cassandraConnector.withSessionDo(code)
}

object CassandraSession{
def apply(conf : CassandraConnectorConf) : CassandraSession = {
val f = () => {
val connector = new CassandraConnector(conf)
val session = connector.openSession()
sys.addShutdownHook{
session.close()
}
connector
}
new CassandraSession(f)
}

Next on the driver side I am creating CassandraSession:

val cassandraSession = CassandraSession(conf)
cassandraSessionBC = streamingContext.sparkContext.broadcast(cassandraSession)

and now on the executor side

rdd.mapPartitions{ x =>
...
cassandraSessionBC.withSessionDo{ session =>
// some asynchronous operations
}
...
}

Thanks to it there is no need for increasting keep alive ms.

I think that the problem with increasing keep alive ms will be in situation when we try to gracefully shutdown streaming.
For example if I increase keep alive connection to 10 seconds and my job will finish processing data probably connection to the cassandra will not be closed.

Piotr Kołaczkowski

unread,
Jul 20, 2015, 4:36:05 AM7/20/15
to spark-conn...@lists.datastax.com, albe...@gmail.com
What is in broadcast_rpc_address / listen_address of your cassandra hosts? The connector gets host addresses from Cassandra, when it first connects to it. This is to improve fault tolerance in case the initial connection hosts went down in the middle of a (long) job.

There is no 0.0.0.0 default used anywhere in the connector code, so it must be getting it from Cassandra.

Piotr Kołaczkowski

unread,
Jul 20, 2015, 4:44:24 AM7/20/15
to spark-conn...@lists.datastax.com, pkol...@datastax.com, albe...@gmail.com
Also, not sure why you're doing all of this complex code instead of just increasing keep_alive_ms. The executors are hard killed by Spark master, so keep_alive_ms won't keep them running. And if you really want to close connections manually, you still have a way to do it, just call CassandraConnector.evictCache.

Bartłomiej Alberski

unread,
Jul 20, 2015, 11:35:40 AM7/20/15
to spark-conn...@lists.datastax.com, albe...@gmail.com
@Piotr Kołaczkowski Thanks for your help. I will check given parameters - I must contact with guys responsible for cassandra configuration.

The problem with increasing keep_alive_ms is that it is hard to find proper value. From theoretical point of view for some period of time you could not receive any event - as a result there will not be processing partitions and session will not be used. In long perspective it could lead to the situation that your connection hadn't be used for time longer that keep_alive_ms. On the other hand you could set really big value and it will be ok.

Next strange thing that I noticed without increasing keep_alive_ms is that I was receiving ERROR no host available.
My code looks more or less:

rdd.mapPartitions{partition =>
...
cassandraConnector.withSessionDo{ session =>
//asynchronous read
//some other operations
// get on Future - wait for read results

// asynchronous write
//some other operations
//wait for write results (ensure that writing was finished inside withSessionDo)
}
...
}

In the log I noticed that connection was closed before asynchronous write was finished. Do you have any idea why I was receiving such error? Theoretically wait operation was inside withSessionDo block so session should not be 'closed'.
Reply all
Reply to author
Forward
0 new messages