Load massive data from Cassandra to Spark

756 views
Skip to first unread message

Eugene Shulga

unread,
Jul 20, 2015, 2:56:19 PM7/20/15
to spark-conn...@lists.datastax.com
Hi,

I store about 100 million records to cassandra per day and keep data for a month. I need to perform spark aggregation job on the data for particular date. Which means select all rows for particular day from cassandra(about 100m) and run some aggregation logic on it.

I can not figure out the best way to do it, I tried multiple approaches:

1. Partition by date and unique cluster field. All data will be stored in 1 cassandra partition and 1 server. Bad idea. Spark fails with DriverException: Timed out waiting for server response.

2. Partition by date+bucket(1..10000) and unique cluster field. I can load data to spark using

sc.cassandraTable[String](keyspace, table).where("date='2015-07-20' and bucket IN ('0','1'...)").select ("serverid")

Data has 10000 cassandra partitions, but when I try to load to spark only 1 executor tries to load the data, the other is idle. Spark fails with DriverException: Timed out waiting for server response.

3. Partition by callid and unique cluster field. Each cassandra partition has about 100 rows and I dont control partitions. I load data to spark using

sc.cassandraTable[String](keyspace, table).select ("serverid")

Thats the only case when I can successfully load a lot of data with no timeouts. But I load whole table with 30 days of data (3 billion rows), which is not good either. I need to load data for specific date only.

Do you have any advices how I can do it?

I would appreciate any help.

Thanks.

Lee McFadden

unread,
Jul 20, 2015, 6:12:42 PM7/20/15
to spark-conn...@lists.datastax.com
Hi Eugene,

To load data from a specific time frame we structured our data in the following way:

  • Source table partitioned by Sensor ID & Event Timestamp
    • Event timestamp is reduced to second granularity, so we get one partition per sensor per second in Cassandra.
    • Depending on the velocity of your data you may be able to create larger partitions - one partition per minute, hour, day, etc.
  • Create a case class with a field for each of the partitions you have in your cassandra table.
    • In our example, we'd have:
    • `case class SourcePartition(sensorId: String, eventTs: Date)`
  • Create an RDD with one pair for each partition in cassandra you want to process.
    • e.g. if you want to process one second of data:
    • Create an RDD that is the cartesian product of all the seconds (or whatever granularity you decide to partition by) and sensor IDs.
    • `var partitions = sc.parallelize(mySensorList).cartesian(sc.parallelize(dateRangeBySeconds)).map(i => SourcePartition(i._1, i._2))`
  • Now you can get your data using joinWithCassandraTable.  Lets say you have an Event case class which contains all the data for a given event:
    • `val dataRDD = partitions.joinWithCassandraTable[Event](keyspace, table_name)`
    • This RDD will be a pair of values.
      • ._1 = SourcePartition case class
      • ._2 = Row from partition.
I tried to boil all this down into the following Gist.  Shout if it's confusing at all, happy to try and explain further.


To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Eugene Shulga

unread,
Jul 21, 2015, 1:33:44 PM7/21/15
to spark-conn...@lists.datastax.com
Hi Lee,

That is good idea, it works for me and from what I read its efficient.
Thank you for help.

Lee McFadden

unread,
Jul 21, 2015, 2:47:46 PM7/21/15
to spark-conn...@lists.datastax.com
I would mention, in fact, that it might be worth avoiding timestamps and suggest that you break your datetime into multiple integer fields.  When running a spark job at scale, we've found that GC pressure is high when you create/destroy a lot of complex types very quickly (such as Date objects during a reduce phase).  Using integer values for year, month, day, hour, second will likely cause you less trouble down the road.

Message has been deleted

Eugene Shulga

unread,
Sep 24, 2015, 3:36:38 PM9/24/15
to DataStax Spark Connector for Apache Cassandra

Do you have some ideas about how much data efficient to have in 1 partition?

Cassandra allows to have long rows and seems like its not supposed to be a bottleneck to have 10-100 thousands entries in cassandra row. Anyway we always select whole row by spark. There is no partial row selects.

Any thoughts about this?

Reply all
Reply to author
Forward
0 new messages