can I use cassandra for checkpointing during a spark streaming job

545 views
Skip to first unread message

kant kodali

unread,
Aug 29, 2016, 4:51:26 PM8/29/16
to spark-conn...@lists.datastax.com
I understand that I cannot use spark streaming window operation without checkpointing to HDFS but Without window operation I don't think we can do much with spark streaming. so since it is very essential can I use Cassandra as a distributed storage? If so, can I see an example on how I can tell spark cluster to use Cassandra for checkpointing?

Russell Spitzer

unread,
Aug 30, 2016, 6:06:13 PM8/30/16
to spark-conn...@lists.datastax.com
Checkpointing is not required for windowing. Perhaps you mean stateful transformations? Or Fault Tolerance? For Kafka fault tolerance all that's really required is an offset storage location which can be Cassandra.

Cassandra out of the box does not provide a hdfs compatible system for generic checkpointing. Due to the size and nature of checkpoints files I would recommend against checkpointing anything to large to Cassandra anyway.

That said

Datastax Enterprise does include a HDFS replacement  DSEFS which runs using Cassandra as a NameNode service (avoiding the large checkpoints in C* problem) as well as an older system CFS which stored data directly in C*.

On Mon, Aug 29, 2016 at 1:51 PM kant kodali <kant...@gmail.com> wrote:
I understand that I cannot use spark streaming window operation without checkpointing to HDFS but Without window operation I don't think we can do much with spark streaming. so since it is very essential can I use Cassandra as a distributed storage? If so, can I see an example on how I can tell spark cluster to use Cassandra for checkpointing?

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com. 
 

kant kodali

unread,
Aug 30, 2016, 6:36:45 PM8/30/16
to spark-conn...@lists.datastax.com
I heard from someone that Checkpointing is required for windowing in a spark-user thread (because I was trying to see how far can I go without a distributed storages to build a real time analytics backend). I wasn't too sure if that was the case but thank for Clarifying it. 

Some people here are bit hesitant to use HDFS but if we have to we will. Majority of them seem to prefer cassandra as a distributed storage.

A couple of our use cases include the following.
  1. we want the results of the computation to be stored somewhere such that when we issue a SPARK SQL query we should get the results back instead of recomputing again.
  2. Checkpointing whatever spark streaming wants to checkpoint. If we loose messages it should be ok because we use a queueing system called NSQ which will resend it if we loose.
  3. our Architecture is like this we have a NSQ consumer(which will have the spark context as well) will push data to the spark cluster to perform necessary computation and send it to dashboard server as well as may be store it Cassandra so we could query at a later time.
Any ideas or thoughts will be greatly appreciated.

Thanks Again!

Russell Spitzer

unread,
Aug 30, 2016, 7:32:52 PM8/30/16
to spark-conn...@lists.datastax.com
http://spark.apache.org/docs/latest/streaming-programming-guide.html#when-to-enable-checkpointing
Checkpointing details ^^

You need some HDFS (or something compatible) for some specific streaming things. WAL (if you need it) and Checkpointing. 

As for your use case

It's a pretty common use-case for folks to run a Stream through Spark and save aggregates to C*. Structured streaming is not really ready for
production users as it lacks the sinks and sources for usage. If you are writing your own receiver for NSQ (I don't know what that is) you'll have 
to look into the docs for figuring out how to get spark to behave correctly in failure. Usually the issue is you don't want to update your offset in 
the queue until the data has been completely processed and of course where do you save that offset :)

kant kodali

unread,
Aug 30, 2016, 8:02:21 PM8/30/16
to spark-conn...@lists.datastax.com
Thanks a lot for your thoughts, It looks like WAL and Checkpointing are there to save us from Driver program failures/crashes so in that case we could avoid HDFS for now.

I don't quite understand the distinction between Receiver and Receiver less Architectures as mentioned in the spark docs. I suppose the distinction is it depends on where we run the Driver Program (inside or outside of the cluster) since the Driver program is the one that maintains spark context as well it spawns a server that keeps receiving messages. so this one process can be called as a consumer of a queue as well as a Driver program for spark context and yes it runs outside of cluster but then what I don't understand is that I heard that the application jar runs on the workers and I am like what is that even mean? because to me the application jar is my process that has a spark context  and running outside of a cluster on a separate machine so I am not really sure what it means to say the application jar will be run on workers/executors?

Please correct me if I am wrong with my distinction on Receiver and Receiver less Architectures.

Russell Spitzer

unread,
Aug 30, 2016, 8:19:51 PM8/30/16
to spark-conn...@lists.datastax.com
I would recommend you go back and watch some of those videos I suggested since I think some of the basics are still a bit cloudy.

Here is a quick summary

Spark Driver: A JVM running your application code, holds Spark Context
Spark Master: Some Process that allocates resources to a Spark Context
Spark Worker: Some Process that receives orders from the master to start Executor JVMS
Spark Executor: The Distributed Part of your Application Code, your Application code will also run on this JVM. 

Receiver vs Receiverless

Recevier: Runs as a long running *task* on a Worker/Executor machine, sits around and waits for events. Based on block_interval takes received data and turns it into a spark partition. After every full batch the partitions from the receiver are presented as an RDD.

RecieverLess: Driver continuously produces RDD's whose metadata describes what data to pull from the queue.

Example: Partition 1 may request offsets 1 - 1000 from the queue, Partition 2 requests offsets 1001-2000 ....

In this way there is no need for a process to listen for events. They are polled directly when the RDD representing them is executed remotely.

The Receiver vs Receiver-less architecture is much more about how do the DStreams get their data. A Receiver basically runs as a long lived process that 

kant kodali

unread,
Aug 31, 2016, 5:50:12 AM8/31/16
to spark-conn...@lists.datastax.com
Hi Russell,

I appreciate taking time to answer my questions. I looked into the videos again for the third time however they are not really precise enough in certain cases so I wrote my inline questions below. It will be awesome if you can answer them.

Kant



On Tue, Aug 30, 2016 5:19 PM, Russell Spitzer russell...@gmail.com wrote:
I would recommend you go back and watch some of those videos I suggested since I think some of the basics are still a bit cloudy.

Here is a quick summary

Spark Driver: A JVM running your application code, holds Spark Context

Spark Master: Some Process that allocates resources to a Spark Context. is this a JVM based process?

Spark Worker: Some Process that receives orders from the master to start Executor JVMS.  is this a JVM based process?

Spark Executor: The Distributed Part of your Application Code, your Application code will also run on this JVM. 


    From your description it looks like Spark Worker and Spark Executors are Separate JVM'S? 


Receiver vs Receiverless

Recevier: Runs as a long running *task* on a Worker/Executor machine, sits around and waits for events. Based on block_interval takes received data and turns it into a spark partition. After every full batch the partitions from the receiver are presented as an RDD. oh I see what you mean now. If I were to describe this in my own words little deeper you are essentially saying If I write a program in java or scala with two sockets open. 1) A socket where I would receive messages from a queuing system 2) A socket for a spark context.
and say I am running this program in machine 5 and spark master, worker are in 6 & 7 respectively.

The part of the code that does #1 will be executed on the worker nodes( other words machine 7) correct?


RecieverLess: Driver continuously produces RDD's whose metadata describes what data to pull from the queue.

Example: Partition 1 may request offsets 1 - 1000 from the queue, Partition 2 requests offsets 1001-2000 ....

In this way there is no need for a process to listen for events. They are polled directly when the RDD representing them is executed remotely.  This Sentence clears my confusion. Thanks a ton! We have have a way to create a custom receiver we just need to implement one interface called Receiver but I don't see a way to create a custom DirectStream.

Russell Spitzer

unread,
Aug 31, 2016, 12:50:22 PM8/31/16
to spark-conn...@lists.datastax.com
Np, But I really would recommend you go check out a book after this :) Holden's https://www.amazon.com/Fast-Processing-Spark-Holden-Karau/dp/1782167064 or https://www.amazon.com/Learning-Spark-Lightning-Fast-Data-Analysis/ (with other folks from the Amp Lab) are both good starters.


Spark Executors are always separate JVMs (ignoring the infrequently used mesos-finegrained mode)

I'm 


On Wed, Aug 31, 2016 at 2:50 AM kant kodali <kant...@gmail.com> wrote:
Hi Russell,

I appreciate taking time to answer my questions. I looked into the videos again for the third time however they are not really precise enough in certain cases so I wrote my inline questions below. It will be awesome if you can answer them.

Kant



On Tue, Aug 30, 2016 5:19 PM, Russell Spitzer russell...@gmail.com wrote:
I would recommend you go back and watch some of those videos I suggested since I think some of the basics are still a bit cloudy.

Here is a quick summary

Spark Driver: A JVM running your application code, holds Spark Context

Spark Master: Some Process that allocates resources to a Spark Context. is this a JVM based process?

Spark Worker: Some Process that receives orders from the master to start Executor JVMS.  is this a JVM based process?

Spark Executor: The Distributed Part of your Application Code, your Application code will also run on this JVM. 


    From your description it looks like Spark Worker and Spark Executors are Separate JVM'S? 

The Master/Workers are usually seperate JVM processes but this depends on your resource manager (they may have different names or be consolidated)

Default: Stand Alone - Seperate JVMs
Yarn - Yarn processes (JVM)
Mesos - C++ processes (i think)

Basically think of that combo as the "Resource Manager and Spark JVM Launcher". By default in StandAlone mode they are separate JVMs.
 


Receiver vs Receiverless

Recevier: Runs as a long running *task* on a Worker/Executor machine, sits around and waits for events. Based on block_interval takes received data and turns it into a spark partition. After every full batch the partitions from the receiver are presented as an RDD. oh I see what you mean now. If I were to describe this in my own words little deeper you are essentially saying If I write a program in java or scala with two sockets open. 1) A socket where I would receive messages from a queuing system 2) A socket for a spark context.
and say I am running this program in machine 5 and spark master, worker are in 6 & 7 respectively.

The part of the code that does #1 will be executed on the worker nodes( other words machine 7) correct?]

I'm not sure what you mean here. 
Given a three node cluster (A, B, C) with a Worker running on each node and A as the master.

I can run a spark application from A (like spark-shell). The spark shell is the Spark Driver it will start a SparkContext which will talk with the Master. The Master will send requests to the workers on A, B and C. The workers will each start up an Executor and tell it to connect to the Spark Driver (spark-shell)

At this point we have the following setup
[A: Master, Worker, Driver, Executor, B: Worker, Executor, C: Worker, Executor]

Then on the driver we run some code

sc.parallelize(1 to 100).map( do something funky).saveToCassandra

The Driver on A creates the RDD which represents this code. The doSomethingFunky must be serialized over to all of the executor machines since they will actually be executing this code. In a normal application this is accomplished by having the application jar present on the classpath of the executors. In the shell it has to be dynamically sent as the lines are compiled.

A then assigns portions of the RDD to be processed by A, B, C executors. Each executor get's it's portion and runs *doSomethingFunky* and then save to cassandra. 

None of the work happens in the "Driver" all the work happens on the "Executors"


RecieverLess: Driver continuously produces RDD's whose metadata describes what data to pull from the queue.

Example: Partition 1 may request offsets 1 - 1000 from the queue, Partition 2 requests offsets 1001-2000 ....

In this way there is no need for a process to listen for events. They are polled directly when the RDD representing them is executed remotely.  This Sentence clears my confusion. Thanks a ton! We have have a way to create a custom receiver we just need to implement one interface called Receiver but I don't see a way to create a custom DirectStream.


The Direct approach just requires extend DStream to generate RDD's based on your own code. See
https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala

Basically you just make a DStream whose code on compute just makes a new RDDs based on offsets changing

https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala#L201-L217 

kant kodali

unread,
Aug 31, 2016, 3:58:51 PM8/31/16
to spark-conn...@lists.datastax.com

Hi Russell,

I think your responses are worth more than 100 pages in a book (not that I don't want to read but I generally like books that cover in depth rather than some high level overview in which case I am very happy to read. I already have one of the books you mentioned) that said I want to value your time and I am trying to hard to ask as few questions as possible and I think this would be last one :) you have already given me lot of your thoughts for free so I can't appreciate enough.

Regarding implementing custom DStreams. It look like I have to implement bunch of things which is fine but I want to make sure if I am headed in the right direction. so In order to implement custom Dstream 
1) I need to extend InputDstream and override onStart() and onStop() methods. 
2) since InputDstream extends DStream I should also override onCompute() for creating RDD's based on offsets. 
3) I should also Implement my own RDD just like kafkaRDD (from the links you pointed out) and potentially create other classes similar to references made in DirectKafkaInputDstream.scala

Does this sound about right?

Thanks,
Kant

Russell Spitzer

unread,
Aug 31, 2016, 4:02:46 PM8/31/16
to spark-conn...@lists.datastax.com
That's about it. Good luck!

kant kodali

unread,
Aug 31, 2016, 4:04:55 PM8/31/16
to spark-conn...@lists.datastax.com
Thanks!

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com. 
 

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

Satish Abburi

unread,
Feb 17, 2017, 11:17:43 AM2/17/17
to DataStax Spark Connector for Apache Cassandra
On Wednesday, August 31, 2016 at 1:04:55 PM UTC-7, kant kodali wrote:
> Thanks!
>
>
> On Wed, Aug 31, 2016 at 1:02 PM, Russell Spitzer <russell...@gmail.com> wrote:
>
> That's about it. Good luck!
>
>
>
>
> On Wed, Aug 31, 2016 at 12:58 PM kant kodali <kant...@gmail.com> wrote:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Hi Russell,
>
>
> I think your responses are worth more than 100 pages in a book (not that I don't want to read but I generally like books that cover in depth rather than some high level overview in which case I am very happy to read. I already have one of the books you mentioned) that said I want to value your time and I am trying to hard to ask as few questions as possible and I think this would be last one :) you have already given me lot of your thoughts for free so I can't appreciate enough.
>
>
> Regarding implementing custom DStreams. It look like I have to implement bunch of things which is fine but I want to make sure if I am headed in the right direction. so In order to implement custom Dstream 
> 1) I need to extend InputDstream and override onStart() and onStop() methods. 
> 2) since InputDstream extends DStream I should also override onCompute() for creating RDD's based on offsets. 
>
> 3) I should also Implement my own RDD just like kafkaRDD (from the links you pointed out) and potentially create other classes similar to references made in DirectKafkaInputDstream.scala
>
>
> Does this sound about right?
>
>
> Thanks,
> Kant
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Aug 30, 2016 4:32 PM, Russell Spitzer russell...@gmail.com
> wrote:
>
>
>
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#when-to-enable-checkpointing
> Checkpointing details ^^
>
> You need some HDFS (or something compatible) for some specific streaming things. WAL (if you need it) and Checkpointing. 
>
> As for your use case
>
> It's a pretty common use-case for folks to run a Stream through Spark and save aggregates to C*. Structured streaming is not really ready for
> production users as it lacks the sinks and sources for usage. If you are writing your own receiver for NSQ (I don't know what that is) you'll have 
> to look into the docs for figuring out how to get spark to behave correctly in failure. Usually the issue is you don't want to update your offset in 
> the queue until the data has been completely processed and of course where do you save that offset :)
>
>
> On Tue, Aug 30, 2016 at 3:36 PM kant kodali <kant...@gmail.com> wrote:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> I heard from someone that Checkpointing is required for windowing in a spark-user thread (because I was trying to see how far can I go without a distributed storages to build a real time analytics backend). I wasn't too sure if that was the case but thank for Clarifying it. 
>
>
> Some people here are bit hesitant to use HDFS but if we have to we will. Majority of them seem to prefer cassandra as a distributed storage.
>
>
> A couple of our use cases include the following.we want the results of the computation to be stored somewhere such that when we issue a SPARK SQL query we should get the results back instead of recomputing again.Checkpointing whatever spark streaming wants to checkpoint. If we loose messages it should be ok because we use a queueing system called NSQ which will resend it if we loose.our Architecture is like this we have a NSQ consumer(which will have the spark context as well) will push data to the spark cluster to perform necessary computation and send it to dashboard server as well as may be store it Cassandra so we could query at a later time.
> Any ideas or thoughts will be greatly appreciated.
>
>
> Thanks Again!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Aug 30, 2016 3:06 PM, Russell Spitzer russell...@gmail.com
> wrote:
>
>
>
>
> Checkpointing is not required for windowing. Perhaps you mean stateful transformations? Or Fault Tolerance? For Kafka fault tolerance all that's really required is an offset storage location which can be Cassandra.
>
> Cassandra out of the box does not provide a hdfs compatible system for generic checkpointing. Due to the size and nature of checkpoints files I would recommend against checkpointing anything to large to Cassandra anyway.
>
> That said
>
> Datastax Enterprise does include a HDFS replacement  DSEFS which runs using Cassandra as a NameNode service (avoiding the large checkpoints in C* problem) as well as an older system CFS which stored data directly in C*.
>
>
>
> On Mon, Aug 29, 2016 at 1:51 PM kant kodali <kant...@gmail.com> wrote:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> I understand that I cannot use spark streaming window operation without checkpointing to HDFS but Without window operation I don't think we can do much with spark streaming. so since it is very essential can I use Cassandra as a distributed storage? If so, can I see an example on how I can tell spark cluster to use Cassandra for checkpointing?
>
>
>
>
>
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.  
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


Are you able to get this working. There is another feature DSEFS - distributed file system in Datastax 5.0, we are trying to use this for the same use case and seeing some issues. Would like to know if anyone got success using DSEFS for checkpoint
Reply all
Reply to author
Forward
0 new messages