When using append for List column, writeConf seems to be ignored using saveToCassandra

312 views
Skip to first unread message

Phil Schwartz

unread,
Jun 23, 2017, 6:24:10 PM6/23/17
to DataStax Spark Connector for Apache Cassandra
I have been scratching my head for hours on this one. It appears, using 2.0.2 of the spark-cassandra connector, that the writeConf parameter TTL option is ignored when using the append functionality for a list column type. For example below:

rdd.saveToCassandra(
keyspaceName = config.cassandraKeyspace,
tableName = config.cassandraTable,
columns = SomeColumns(
"session_id",
"list_col" append, // append doesn't allow the TTL to be set...? Remove it and you get a TTL
"timestamp"
),
writeConf = WriteConf(ttl = TTLOption.constant(100))
)

In that current configuration, running a streaming job and checking the destination table,

select ttl(timestamp) from keyspace.table;

no ttl is being set (all nulls), if you remove the append and allow it to just overwrite you now get TTLs on the column. I'm not sure if it's completely ignoring the entire writeConf as I do have other settings in my spark conf I was passing via fromSparkConf when I noticed this.


val spark = SparkSession.builder()
.appName(appName)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.cassandra.connection.keep_alive_ms", "60000")
.config("spark.cassandra.connection.host", config.cassandraHosts)
.config("spark.cassandra.auth.username", config.cassandraUser)
.config("spark.cassandra.auth.password", config.cassandraPassword)
.config("spark.cassandra.output.batch.size.bytes", "8000")
.config("spark.cassandra.output.concurrent.writes", "300")
.config("spark.cassandra.output.ttl", config.cassandraRowTTL)
.config("spark.metrics.namespace", appName)
.config("spark.streaming.backpressure.enabled", "false")
.getOrCreate()


// use the write settings from spark conf
val cassWriteConf = WriteConf.fromSparkConf(spark.sparkContext.getConf)
// Was then passing this to writeConf

Has anyone else noticed this? Is this expected because of the collection type, or something?

I ported my dataframe api saves to use saveToCassandra so that I could use this append functionality :)

As a workaround I can use a default ttl on the table level, but would be nice to set it per row if needed.

Thanks!

Taekyung Yoo

unread,
Sep 8, 2017, 11:08:17 PM9/8/17
to DataStax Spark Connector for Apache Cassandra, schwa...@gmail.com
2017년 6월 24일 토요일 오전 7시 24분 10초 UTC+9, Phil Schwartz 님의 말:
I had the same issue with you.

I used "UPDAET [tablename] USING TTL [second] set col1 = col1 + {'data'} " command for attaching a ttl to each value in collection columns.

The problem is that when you use collection features like 'append', cassandra-connector use 'UPDATE' query to reflect your command to cassandra. however, UPDATE query template does not use WriteConf to make UPDATE query.

you can see current version of TableWrite.scala in this link: https://github.com/datastax/spark-cassandra-connector/blob/v2.0.5/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala

I modified queryTemplateUsingUpdate functions using WrietConf with reference to queryTemplateUsingInsert:

private lazy val queryTemplateUsingUpdate: String = {
val (primaryKey, regularColumns) = columns.partition(_.isPrimaryKeyColumn)
val (counterColumns, nonCounterColumns) = regularColumns.partition(_.isCounterColumn)

val nameToBehavior = (columnSelector collect {
case cn:CollectionColumnName => cn.columnName -> cn.collectionBehavior
}).toMap

val setNonCounterColumnsClause = for {
colDef <- nonCounterColumns
name = colDef.columnName
collectionBehavior = nameToBehavior.get(name)
quotedName = quote(name)
} yield collectionBehavior match {
case Some(CollectionAppend) => s"$quotedName = $quotedName + :$quotedName"
case Some(CollectionPrepend) => s"$quotedName = :$quotedName + $quotedName"
case Some(CollectionRemove) => s"$quotedName = $quotedName - :$quotedName"
case Some(CollectionOverwrite) | None => s"$quotedName = :$quotedName"
}

def quotedColumnNames(columns: Seq[ColumnDef]) = columns.map(_.columnName).map(quote)
val setCounterColumnsClause = quotedColumnNames(counterColumns).map(c => s"$c = $c + :$c")
val setClause = (setNonCounterColumnsClause ++ setCounterColumnsClause).mkString(", ")
val whereClause = quotedColumnNames(primaryKey).map(c => s"$c = :$c").mkString(" AND ")

val ttlSpec = writeConf.ttl match {
case TTLOption(PerRowWriteOptionValue(placeholder)) => Some(s"TTL :$placeholder")
case TTLOption(StaticWriteOptionValue(value)) => Some(s"TTL $value")
case _ => None
}

val options = List(ttlSpec).flatten
val optionsSpec = if (options.nonEmpty) s"USING ${options.mkString(" AND ")}" else ""

s"UPDATE ${quote(keyspaceName)}.${quote(tableName)} $optionsSpec SET $setClause WHERE $whereClause"
}

I confirmed that it correctly attach a ttl to each value in collection columns.

I'm also a beginner of cassandra, so I do not know about the side effects of my code. please think carefully and change it.


Thanks.

Russell Spitzer

unread,
Sep 8, 2017, 11:37:25 PM9/8/17
to DataStax Spark Connector for Apache Cassandra, schwa...@gmail.com
Sounds like we should patch this up! Want to add a Jira?

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Taekyung Yoo

unread,
Sep 9, 2017, 4:09:43 AM9/9/17
to DataStax Spark Connector for Apache Cassandra, schwa...@gmail.com
Yes i hope it.

2017년 9월 9일 토요일 오후 12시 37분 25초 UTC+9, Russell Spitzer 님의 말:

pruthvi chitrala

unread,
Jul 1, 2018, 3:02:04 AM7/1/18
to DataStax Spark Connector for Apache Cassandra, schwa...@gmail.com
I'm also facing same issue. Does this issue got resolved? Any workaround available for this problem?

Russell Spitzer

unread,
Jul 1, 2018, 10:45:59 AM7/1/18
to spark-conn...@lists.datastax.com, schwa...@gmail.com
Reply all
Reply to author
Forward
0 new messages