can we read data in and out of cassandra using spark connector in a non-blocking fashion?

219 views
Skip to first unread message

kant kodali

unread,
Aug 22, 2016, 5:49:18 PM8/22/16
to DataStax Spark Connector for Apache Cassandra
can we read data in and out of cassandra using spark connector in a non-blocking fashion? All the examples that I see that makes a call to cassandra using spark connector looks like they are blocking calls

Russell Spitzer

unread,
Aug 22, 2016, 6:14:45 PM8/22/16
to DataStax Spark Connector for Apache Cassandra
You can always run the calls from another thread or use Scala Futures, but usually most users want blocking code because they want to fully utilize their cluster for one operation until the next starts. Could you give a larger explanation of what you are trying to do?

On Mon, Aug 22, 2016 at 2:49 PM kant kodali <kant...@gmail.com> wrote:
can we read data in and out of cassandra using spark connector in a non-blocking fashion? All the examples that I see that makes a call to cassandra using spark connector looks like they are blocking calls

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

kant kodali

unread,
Aug 22, 2016, 6:47:33 PM8/22/16
to DataStax Spark Connector for Apache Cassandra
Nothing really specific. Instead of waiting for a thread to finish the I/O call I probably want the thread to perform some other computation. This will allow me to model everything as a pipeline of reactive streams.

It would be great to have a non-blocking I/O or even better a reactive interface driver than creating and wrapping futures for every I/O call.

As far as multiple threads are concerned we know the parking and unparking of threads can be a bottleneck to the point where Cassandra itself is planning to change everything from SEDA (Staged event driven Architecture) to TPC (Thread per core). Below are the respective tickets.

https://issues.apache.org/jira/browse/CASSANDRA-10989
https://issues.apache.org/jira/browse/CASSANDRA-10993

Thanks!

On Monday, August 22, 2016 at 3:14:45 PM UTC-7, Russell Spitzer wrote:
> You can always run the calls from another thread or use Scala Futures, but usually most users want blocking code because they want to fully utilize their cluster for one operation until the next starts. Could you give a larger explanation of what you are trying to do?
>
>
> On Mon, Aug 22, 2016 at 2:49 PM kant kodali <kant...@gmail.com> wrote:
> can we read data in and out of cassandra using spark connector in a non-blocking fashion? All the examples that I see that makes a call to cassandra using spark connector looks like they are blocking calls
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
> --
>
>
>
>

> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
> http://spark-packages.org/package/datastax/spark-cassandra-connector

Russell Spitzer

unread,
Aug 22, 2016, 10:50:00 PM8/22/16
to DataStax Spark Connector for Apache Cassandra
I guess I don't really understand. The Spark you have a Driver JVM and executor JVMS that are remote. You want to do other work on the Driver JVM while the job is running? Could you give me an example of the code you would want to run? In most cases there really isn't any "local" work you can be doing while you wait for your distributed work to finish. For example "sqlContext.read.format.options.load.write.format(parquet).options.save" doesn't have any local IO.

Unlike C*, the Driver application really doesn't have much local work actually happening. In most applications it's really just used to manage the remote distribution of work and start distributed tasks. There is essentially no I/O back to the host machine most of the time. (If you are doing IO back to the driver a lot that's usually a code smell)

Additionally it would basically be impossible for us to change anything in the Spark library to return results synchronously (collect, take, foreach). The only Action (blocking operations) that we have any control over is "saveToCassandra" which we could provide a future for but again I'm not really sure we would add much value by doing so. 

If you really do want to wrap a result the Scala Futures library really doesn't require you to do much. Just use Future { your code here } and you can wrap any method including Spark Library ones.


Here is an example where our test code locally sets up a bunch of C* keyspaces before the actual tests begin
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/rdd/CassandraRDDSpec.scala#L66-L194

Do you have any specific functions you want to have return Futures other than "saveToCassandra"?

And just so you know the future/async interfaces Scala Futures, Guava Futures, and now Java8 Futures, etc all run on a java ExecutorServices under the hood which is just fancy bunch of threads. The only real issue comes up when you have a huge number of threads. 

kant kodali

unread,
Aug 22, 2016, 11:59:51 PM8/22/16
to spark-conn...@lists.datastax.com
Hi,

I don't have any code samples yet but I can certainly let you know once I get there until then we can keep this discussion a bit more theoretical if you don't mind. I use Java and I am also new to spark however I know hadoop well enough. I want to start out addressing the following question.

Do you have any specific functions you want to have return Futures other than "saveToCassandra"? 

yes, Basically any I/O call to Cassandra I would think it would be great to have non-blocking version of it as well moreover in my practical experience when dealing with performance of large scale distributed systems I have seen non-blocking I/O calls had given significant performance boost. 

To keep it simple say I need to make two I/O calls(either to network or disk) (I/O call1 & I/O call 2). With non-blocking interface I can make these two calls without waiting for one of them to finish using single thread whereas in blocking you have to wait for one of them to finish before you call another or you have to spawn multiple threads and this in a large scale is proven to be more expensive even if you have a thread pool so I would avoid saying things like "just wrapping a Future around it". Cassandra has a complete non-blocking interface where both reads and writes are done in a non-blocking fashion and those non-blocking functions return something called ListenableFuture<ResultSet> which is very useful.

saveToCassandra is only one of them but how about reads? say I am performing ETL where I read data from Cassandra do some large scale computation and then store it back in Cassandra. In this case say I want to read rows in batches or something like that I want to be able to issue multiple read requests and start performing computation on the result that came in first. Now again, I can do this by issuing multiple threads but this goes back to the point I made in the previous paragraph.

Another thing you mentioned was there really isn't any local work. I wonder how spark does Map Reduce? aren't the intermediate files written to the disk locally? and the reduce process will do remote read? I thought all MR stuff is about local-write and remote read. No? In the spark Architecture which components act as a Map process and which components acts as reduce process? It will be great if you can let me know.

Thanks much!

Russell Spitzer

unread,
Aug 23, 2016, 2:14:00 AM8/23/16
to spark-conn...@lists.datastax.com
I'm pretty clear on Sync and Async code and the C* Java Driver :P but I'm still not sure what you are asking for in a Spark Api.  When you write Spark code you are writing a high level description of how data is manipulated, it's a batch processing framework at a much higher level than individual read and write requests. But let me try to work through your comments.

Futures:

ListenableFuture<ResultSet> would be identical to Scala Future { some code that emits a ResultSet} The only difference is one is a Guava Future and the other uses Scala Futures. Guava futures *still* use thread pools and an underlying executor service, the differences are just in what api's are available and how the schedulers allocate work to the underlying pool . Both rely on an underlying ExecutorService which runs tasks. Futures and ExecutorServices and all of this are really just abstractions to make working with concurrency easier. They still all end up making use of threads (and ThreadPools) deep under the hood. So saying that you won't use a Future{} but do want a ListenableFuture is a little confusing.

An example of the multiple ExecutorServices running in the Cassandra Java Driver
https://github.com/datastax/java-driver/blob/f76aa2ff9ed46f60ec90e542d543a29075b0ecbc/driver-core/src/main/java/com/datastax/driver/core/Cluster.java#L1322-L1332

These are the workhorses that make all of the ListenableFutures actually complete.

How Spark Works:

Spark is much higher level an abstraction that Map Reduce code ( Not that you would write your own code to "read data from Cassandra" in MR either, you would use an Input Format provided by the C* project see CqlStorage), it uses a high level RDD interface and hides the internals of how a lot of things work together.

The application you write is the Spark Driver, basically the manager of the distributed system. From there you call on a series of apis to create RDDs (or DataFrames). For example the call sc.cassandraTable returns instantly with an RDD which represents the operations which map data from C* into a RDD. The RDD is basically a mapping of the C* into discrete independent blocks of data called Spark Partitions. This call takes milliseconds and reads NO data from Cassandra. Transformation functions are then applied to RDDs which in turn yield more RDDs (again instantaneously). For example

val x = sc.cassandraTable(ks, table).map( this => that ).filter(that > something).map(that => complicatedFunction(that))

Would instantly return an RDD[that] which would represent the complete set of operations required to transform the entire table (ks, table) into our objects of type [that]. This RDD is lazily constructed and no work is actually done on it. This is a pattern with Spark. Lazily construct objects and only do work when absolutely required. Only when an Action function is applied (like collect or saveToCassandra) does anything actually happen. 

So if you then call

x.saveToCassandra

The metadata for constructing and computing the RDD is shipped by the driver to various spark executors (remote JVMs), each executor core will work on one partition at a time until it reaches a shuffle stage (Shuffling is a little more complicated). This means the the above code will be executed in parallel running items through the full chain of operations in multiple parallel threads on many different machines.

Reading from C* (asynchronously) -> Passing through a Filter -> Applying some weird function --> Save back to Cassandra

Each executor core will be running through this chain with it's own block of data, each command (passing through a filter/ applying some function) both taking and emitting an iterator so that objects are pulled through as quickly as possible. At no time during this process is any data (except for the metadata about task failure or success) sent back to the Spark Driver application.

If one Core finishes on a partition, the driver will assign it another block to work on. All of this happens automatically and is in not something that the Spark API user has to be aware of.

During the whole process the Spark Driver is blocked on awaiting the result of this "Job". Being asynchronous in waiting for this scheduling to complete can only benefit you if you are running less partitions than the entire Spark Cluster can handle at max or if you are using the Fair Scheduler within Spark.

In those cases you could use the Future api to start scheduling multiple sets of distributed work at the same time. This is usually not a priority for most Spark Application writers as they usually have far more work than their cluster can do (Partitions >>> number of cores.) and they aren't using their Driver Applications to do any processing since none of the data is actually present on the local machine.

For example

AwaitAll (
Future { sc.cassandraTable.map.saveToCassandra},
Future { sc.cassandraTable.map.saveToCassandra}
)
{AwaitAll is a construct we made up in our testing code but is basically await(Future.Sequence(args:_*, indefinitely)}

Will start two separate spark jobs being scheduled in parallel but If the first job get's all it's tasks into the queue and there are no cores left you will still basically be working FIFO. The Fair Scheduler would interleave tasks between the two jobs. Usually I recommend folks use the SparkSqlThriftServer if they are doing that kind of scheduling if their queries can be represented in HiveQL.

Manually working with C* in Spark

The SCC also allows you to use any Java driver api internally within your Spark Job including the async apis. Usually I recommend that users do something like

val cc =  CassandraConnector(sc.getConf)
RDD.mapPartitions( it =>
   cc.withSessionDo( session => 
     it.map( rddElement =>  session.something here)))


If they have an complicated command they would like to run. But this is usually not needed as the normal apis like CassandraTable implement basic asynchronous reading under hood.

Why do I say there is no Local Work? How does Map Reduce Work?

Like i said before, there is only a single type of task execution process in Spark, the Spark Executor. Executors only need to communicate with each other during "Shuffles." During a shuffle, each machine will if possible directly ship blocks to the nodes which requires them. If the amount of memory exceeds the available ram it will spill partitions or shuffle files to disk. The Executor's local disk will the hold the partition till it is ready to be shipped to another Executor.

 None of this information is going back to the Spark Driver. This is because all of this communication is done Executor to Executor with the Driver only handling orchestration (where the blocks are and who should get what blocks.) This is why I say that there is no *Driver* local work being done. All of this shuffling and disk and network I/O is internal within the Spark Executor and is not a part of User code. The Executor process does both mapping and reducing work (as well as arranging shuffling.) None of this is directly exposed.

Why is SaveToCassandra the only Spark Cassandra Connector Call that Could Possibly (or sensibly) return a future?

Only Spark Actions on an RDD are blocking (as they wait for the job to complete to proceed) and saveToCassandra is the only action we provide. Functions like cassandraTable or select or joinWithCassandraTable are all Transformations which generate new RDDs with dependencies linking to their predecessors. Because of this they don't actually do any work on the cluster and they return instantly.

All of the other actions http://spark.apache.org/docs/latest/programming-guide.html#actions are Spark so we could not wrap them in the connector. Although it would still be trivial to wrap them in futures if you wanted to run multiple jobs at the same time.

 You could also have your driver application doing other things while it was scheduling spark jobs but most people do not do that. This is usually because all of the data is remote so there isn't much that the driver can do to advance the program.

Conclusion and More Resources

Spark is a really fun technology and I suggest you take a look at some of the following videos and blog posts to get more accustomed to it and how it works with Cassandra. 

http://spark.apache.org/docs/latest/programming-guide.html

Paco Talking about Apache Spark (Warning 6 awesome hours)
https://www.youtube.com/watch?v=EuWDz2Vb1Io

https://academy.datastax.com/resources/how-spark-cassandra-connector-reads-data

https://academy.datastax.com/resources/how-spark-cassandra-connector-writes-data

http://www.slideshare.net/JenAman/spark-and-cassandra-2-fast-2-furious-63064852

Cassandra and Spark optimizing For Data Locality
https://www.youtube.com/watch?v=ikCzILOpYvA

And there is a whole set of associated documentation and training here

--

vincent gromakowski

unread,
Aug 23, 2016, 2:50:34 AM8/23/16
to spark-conn...@lists.datastax.com

Hi
What about using actors  for async actions like writing output of microbatches to c* ? It would allow to maximize time spent on writing while the next microbatch is being computed ? The extra cost would be more memory ?


> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

>
>
> --
>
>
>
>
> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
> http://spark-packages.org/package/datastax/spark-cassandra-connector

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

Russell Spitzer

unread,
Aug 23, 2016, 9:59:51 AM8/23/16
to spark-conn...@lists.datastax.com
@Vincent, the Writing process is already done concurrently and buffered as data is fed in

https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala#L133-L175.

Using Guava Futures

https://github.com/datastax/spark-cassandra-connector/blob/44de124012698ab6bb985e5c45af5eaba4146cac/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala

In addition grouping and batching happen automatically and you can tune the levels of concurrency. You can watch the video I linked to above for more information. https://academy.datastax.com/resources/how-spark-cassandra-connector-writes-data Actors would just switch our concurrency abstraction and wouldn't really be any different than what we are doing now.

> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

>
>
> --
>
>
>
>
> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
> http://spark-packages.org/package/datastax/spark-cassandra-connector

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

kant kodali

unread,
Aug 23, 2016, 10:20:25 AM8/23/16
to spark-conn...@lists.datastax.com
you have made some very good points and thank you so much for sharing the knowledge as well as your thoughts. I am very excited to go through the links you had pointed out. I just finished watching few and few more to go!

kant

Eric Meisel

unread,
Aug 23, 2016, 6:32:06 PM8/23/16
to spark-conn...@lists.datastax.com
I just want to say, from an outsider, this thread was really informative. Thanks for taking the time to detail this out guys.
Reply all
Reply to author
Forward
0 new messages