Custom column with default value in saveToCassandra method

1,137 views
Skip to first unread message

Ericsson de Oliveira

unread,
Aug 5, 2016, 4:47:08 PM8/5/16
to DataStax Spark Connector for Apache Cassandra
Hello,

i have 2 tables:

CREATE TABLE ks.t1 (
colA text, colB int, colC bigint, colD timestamp,
PRIMARY KEY (( colA, colB ), colC)
)

and

CREATE TABLE ks.t2 (
colA text, colB int, colC bigint, colD timestamp, colE Boolean, colF text
PRIMARY KEY (( colA, colB ), colC)
)

I created the C* class in spark:
case class cl1(colA:String, colB:Int, colC:BigInt)

And populated with this:
val no30 = sc.cassandraTable[cl1]("ks","t1").select("colA","colB","colC")

And save it with this:
no30.saveToCassandra("ks","t2", SomeColumns("colA","colB","colC"))

Question:
colD, colE and colF have value null. Is there a way to set a default value?
Like for colE i want to set a false to all the rows.

I tried this approach but no success:
case class cl1(colA:String, colB:Int, colC:BigInt, colD:Boolean = false)

Populated with this:
val no30 = sc.cassandraTable[cl1]("ks","t1").select("colA","colB","colC")

And when tried to save with this:
no30.saveToCassandra("ks","t2", SomeColumns("colA","colB","colC","colD"))

It returned this:
java.lang.IllegalArgumentException: Failed to map constructor parameter expirationchanged in newoffer32 to a column of ks.t1
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$4$$anonfun$apply$1.apply(DefaultColumnMapper.scala:78)
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$4$$anonfun$apply$1.apply(DefaultColumnMapper.scala:78)
at scala.Option.getOrElse(Option.scala:120)
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$4.apply(DefaultColumnMapper.scala:78)
at com.datastax.spark.connector.mapper.DefaultColumnMapper$$anonfun$4.apply(DefaultColumnMapper.scala:76)
at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForReading(DefaultColumnMapper.scala:76)
at com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.<init>(GettableDataToMappedTypeConverter.scala:56)
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:23)
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:48)
at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:43)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:48)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:151)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)

Can someone help me with this case?
Tried with this too but no success:
no30.saveToCassandra("ks","t2", SomeColumns("colA","colB","colC","colD"=false))
But with class definition without colD:
case class cl1(colA:String, colB:Int, colC:BigInt)

Russell Spitzer

unread,
Aug 5, 2016, 5:19:07 PM8/5/16
to DataStax Spark Connector for Apache Cassandra
Why not just map the default value to a class with the default value?

 Table: fromtable
----------------------------------------
 - a                       : Int                                                                              (partition key column)
 - b                       : Int
 - c                       : Int

 Table: totable
----------------------------------------
 - a                       : Int                                                                              (partition key column)
 - d                       : Int                                                                              (clustering column)
 - b                       : Int
 - c                       : Int



case class FromRow (a: Int, b: Int, c: Int )
case class ToRowWithDefault (a: Int, b: Int, c: Int, d: Int = 2 )

sc.cassandraTable[FromRow]("test", "fromtable")
  .map( fromRow => ToRowWithDefault(fromRow.a, fromRow.b, fromRow.c))
  .saveToCassandra("test", "totable")

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Ericsson de Oliveira

unread,
Aug 8, 2016, 9:28:17 AM8/8/16
to DataStax Spark Connector for Apache Cassandra
> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
> http://spark-packages.org/package/datastax/spark-cassandra-connector

Hello Russel,

your solution worked like a charm, but then i came with a scala limitation:
<console>:7: error: Implementation restriction: case classes cannot have more than 22 parameters.

I tried to create a class inside and referencing it inside the other class, but it didn't work.
Read that with scala 2.11 this was solved, but we can't update the softwares easily here.
Questions:
-Is it possible to change the scala jars only inside the folder /usr/share/dse/resources/spark/lib/ ?
-Is it possible to only update spark with using the same dse (4.8.7)?
-Do you know other approach that i can try to do the same task? My source table has 23 columns and the destiny table has 30 columns, is it possible and performatic to do an insert into select query?

Regards

Russell Spitzer

unread,
Aug 8, 2016, 1:32:15 PM8/8/16
to DataStax Spark Connector for Apache Cassandra
You cannot change the Scala version without changing the scala version of all of the other libraries. This is probably not very easy or possible with DSE.

You're best bet if you have more than 22 parameters (Thats a lot of parameters :( ) Is to use CassandraRow objects. I would recommend trying to figure out if you can skip any parameters but if you can't then convert the result into a map, then add the default then flip back to a CassandraRow.

scala> def cassandraRowToMap( cr: CassandraRow) = { cr.metaData.columnNames.zip(cr.columnValues).toMap }
cassandraRowToMap: (cr: com.datastax.spark.connector.CassandraRow)scala.collection.immutable.Map[String,AnyRef]

scala> sc.cassandraTable("test", "fromtable").map( oldRow => CassandraRow.fromMap(cassandraRowToMap(oldRow) + ("d" -> "2"))).saveToCassandra("test", "totable")

neeraj surana

unread,
Aug 8, 2016, 11:55:36 PM8/8/16
to spark-conn...@lists.datastax.com

I came across the similar situation and I went ahead with Rowrdd and attaching the schema while converting to df.

In case you plan to upgrade Scala you well have to build your spark with the newer version of Scala. So you should be ready for that.

Otherwise you can use Rowrdd which worked for me. If you n need I can give some code samples.


> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

>
>
> --
>
>
>
>
> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
> http://spark-packages.org/package/datastax/spark-cassandra-connector

Hello Russel,

your solution worked like a charm, but then i came with a scala limitation:
<console>:7: error: Implementation restriction: case classes cannot have more than 22 parameters.

I tried to create a class inside and referencing it inside the other class, but it didn't work.
Read that with scala 2.11 this was solved, but we can't update the softwares easily here.
Questions:
-Is it possible to change the scala jars only inside the folder /usr/share/dse/resources/spark/lib/ ?
-Is it possible to only update spark with using the same dse (4.8.7)?
-Do you know other approach that i can try to do the same task? My source table has 23 columns and the destiny table has 30 columns, is it possible and performatic to do an insert into select query?

Regards

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-user+unsub...@lists.datastax.com.

Ericsson de Oliveira

unread,
Aug 11, 2016, 3:58:53 PM8/11/16
to DataStax Spark Connector for Apache Cassandra
> > To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
> >
>
> >
>
> > --
>
> >
>
> >
>
> >
>
> >
>
> > Russell Spitzer
>
> > Software Engineer
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> >
>
> > https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
>
> > http://spark-packages.org/package/datastax/spark-cassandra-connector
>
>
>
> Hello Russel,
>
>
>
> your solution worked like a charm, but then i came with a scala limitation:
>
> <console>:7: error: Implementation restriction: case classes cannot have more than 22 parameters.
>
>
>
> I tried to create a class inside and referencing it inside the other class, but it didn't work.
>
> Read that with scala 2.11 this was solved, but we can't update the softwares easily here.
>
> Questions:
>
> -Is it possible to change the scala jars only inside the folder /usr/share/dse/resources/spark/lib/ ?
>
> -Is it possible to only update spark with using the same dse (4.8.7)?
>
> -Do you know other approach that i can try to do the same task? My source table has 23 columns and the destiny table has 30 columns, is it possible and performatic to do an insert into select query?
>
>
>
> Regards
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
>
>
>
>
>
>
> --
>
> You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Russel,

thanks for all the help, i used your solution and it worked.
Here is how it is now:

scala> def cassandraRowToMap( cr: CassandraRow) = { cr.columnNames.zip(cr.columnValues).toMap }


scala> sc.cassandraTable("test","fromtable").map(oldRow => CassandraRow.fromMap(cassandraRowToMap(oldRow) + ("Int_Col" -> 0) + ("Boolean_Col" -> false) + ("Timestamp_With_DateMin_Col" -> java.sql.Timestamp.valueOf("0001-01-01 12:00:00")) + ("Timestamp_from_TimeUUID_Col" -> (oldRow.getUUID("uuid_col").timestamp() - 0x01b21dd213814000L) /10000 ) )).saveToCassandra("test","totable")

And with a column that have default value NULL, i used SomeColumns parameter like this:
sc.cassandraTable("test","fromtable").map(oldRow => CassandraRow.fromMap(cassandraRowToMap(oldRow) + ("Int_Col" -> 0) + ("Boolean_Col" -> false) + ("Null_Col" -> null) )).saveToCassandra("test","totable", SomeColumns("col1","col2","Int_Col","Boolean_Col"))

In my original query, the totable has 25 columns

I ran some tests with a table with 3 millions rows and it took 6 minutes to insert everything (from table: 25 columns, to table: 30 columns).

I saw that the from table has 854 MB of size with 3 millions lines, and the table that i will do the test later have 67 GB of size. I did a quick math and it would take around 8 hours to finish.

Is that a way to improve the performance of this query? Do you know any parameters that i can set in spark, cassandra or spark connector that can make things faster?

Best regards

Russell Spitzer

unread,
Aug 11, 2016, 4:08:14 PM8/11/16
to DataStax Spark Connector for Apache Cassandra
First thing to do is to figure out what takes time,
Do just the reading portion then call count. (This is your reading time)
Do reading then convert it to the new rows and subtract the reading time (This is the conversion Time)
Do everything to saving back to cassandra and subtract reading and conversion time (This is the write time)

Depending on what takes the most amount of time I could make some guesses

Ericsson de Oliveira

unread,
Aug 11, 2016, 4:38:08 PM8/11/16
to DataStax Spark Connector for Apache Cassandra
Reading Time: 30 seconds
Conversion Time: 0 seconds
Write Time: 5 minutes 38 seconds (Total: 6min 8sec)

About Conversion Time: If the conversion part goes from the beginning til just before saveToCassandra, then this takes 0s. If is all that plus .count call, then it takes the same 30s, minus the reading = 0s

Russell Spitzer

unread,
Aug 11, 2016, 4:54:31 PM8/11/16
to DataStax Spark Connector for Apache Cassandra
So it looks like the most savings you can get are in the writing. If you are writing to separate partition keys with each write (never adding multiple columns to the same partition, you can try increasing the number of simultaneous writes)

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#write-tuning-parameters

Set this to 1

output.batch.size.rows

Which basically means no batching

And

Increasing this

output.concurrent.writes


This should increase the amount of in flight requests to C* which may increase throughput depending on the capability of your cluster
Reply all
Reply to author
Forward
0 new messages