Some primary key columns are missing in RDD or not selected

408 views
Skip to first unread message

Kartik Mathur

unread,
Jun 5, 2015, 5:57:36 PM6/5/15
to spark-conn...@lists.datastax.com

Hello geeks ,
I am stuck with this problem , which is quite bizarre

when i try to use data.saveToCassandra like below , I get followng exception -
"Some primary key columns are missing in RDD or not selected"

data is RDD of rows like this :
RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)



I verified multiple times that number of primary columns and data type matches
between RDD and cassandra table , unable to understand the problem !!!

data.saveToCassandra("report", "t_heat_map",
SomeColumns("advId".toLowerCase(),
//"impDayAdvZ".toLowerCase(),
"campId".toLowerCase(),
"agId".toLowerCase(),
"faId".toLowerCase(),
"pubId".toLowerCase(),
"brnm".toLowerCase(),
"brver".toLowerCase(),
"elid".toLowerCase(),
"elnm".toLowerCase(),
"eltyp".toLowerCase(),
"evttyp".toLowerCase(),
"evttypno".toLowerCase(),
"iswl".toLowerCase(),
"osnm".toLowerCase(),
"osver".toLowerCase(),
"pgid".toLowerCase(),
"pgord".toLowerCase(),
"ppid".toLowerCase(),
"x".toLowerCase(),
"y".toLowerCase()))

This is the table structure in cassandra -


CREATE TABLE t_heat_map (
advid text,
campid text,
agid text,
faid text,
pubid text,
brnm text,
brver text,
elid text,
elnm text,
eltyp text,
evttyp text,
evttypno bigint,
iswl boolean,
osnm text,
osver text,
pgid text,
pgord text,
ppid text,
x int,
y int,
PRIMARY KEY ((advid), campid, agid, faid, pubid)
)



Full RDD printed -

15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,false,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)
15/06/05 14:51:52 INFO sparkjobs.HeatMapProcessor: RDD row:(1,2,3,4,5,6,7,8,9,10,11,1,true,12,13,14,15,16,10,10)

Russell Spitzer

unread,
Jun 5, 2015, 6:19:23 PM6/5/15
to spark-conn...@lists.datastax.com
Could you give some more code around where you are doing this? Spark-shell? C* version, Spark version, Connector Version?

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Kartik Mathur

unread,
Jun 5, 2015, 6:26:22 PM6/5/15
to spark-conn...@lists.datastax.com
Thanks a lot Russell for the prompt reply , 

I am using spark submit to submit this on cluster .
Using spark datastax connector 
spark version - 1.2.1

  • Workers: 3
  • Cores: 18 Total, 6 Used
  • Memory: 12.0 GB Total, 7.5 GB Used


Snnipet from my build.sbt might be useful - 
libraryDependencies += "com.typesafe" % "config" % "1.2.1",
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.1",
    libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "1.2.0-alpha2",
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.2.1" ,
    libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.2.1",
    libraryDependencies += "org.spark-project.hive" % "hive-cli" % "0.13.1",
    libraryDependencies += "org.spark-project.hive" % "hive-jdbc" % "0.13.1",
    libraryDependencies += "org.spark-project.hive" % "hive-beeline" % "0.13.1",
    libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.9",
    libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.35",
    libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "2.0.0-mr1-cdh4.6.0",
    libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-mr1-cdh4.6.0",
    libraryDependencies += "com.hadoop.gplcompression" % "hadoop-lzo" % "cdh4-0.4.15-gplextras",
    libraryDependencies += "org.apache.spark" % "spark-mllib_2.10" % "1.0.0",
    libraryDependencies += "org.scala-lang" % "scala-library" % "2.10.5",
    libraryDependencies += "com.github.scopt" % "scopt_2.10" % "3.2.0",
    libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test"

Russell Spitzer

unread,
Jun 5, 2015, 6:38:39 PM6/5/15
to spark-conn...@lists.datastax.com
Have you tried with the mainline 1.2.2 release of the connector? If it repos there i'll see if I can get a reproduction on my local system.

Kartik Mathur

unread,
Jun 5, 2015, 6:55:12 PM6/5/15
to spark-conn...@lists.datastax.com
Thanks , tried with 1.2.2 , thankfully that exception is gone but now I am getting this  -
<none> is not a term. However , I don't see <none> anywhere in code or in the input rdd.







scala.ScalaReflectionException: <none> is not a term
at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:259)
at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:73)
at com.datastax.spark.connector.util.Reflect$.methodSymbol(Reflect.scala:12)
at com.datastax.spark.connector.util.ReflectionUtil$.constructorParams(ReflectionUtil.scala:60)
at com.datastax.spark.connector.util.ReflectionUtil$.constructorParams(ReflectionUtil.scala:69)
at com.datastax.spark.connector.mapper.DefaultColumnMapper.<init>(DefaultColumnMapper.scala:42)
at com.datastax.spark.connector.mapper.LowPriorityColumnMapper$class.defaultColumnMapper(ColumnMapper.scala:38)
at com.datastax.spark.connector.mapper.ColumnMapper$.defaultColumnMapper(ColumnMapper.scala:42)
at com.inadco.rta.sparkjobs.HeatMapProcessor$$anonfun$processFASClick$2.apply(HeatMapProcessor.scala:66)
at com.inadco.rta.sparkjobs.HeatMapProcessor$$anonfun$processFASClick$2.apply(HeatMapProcessor.scala:45)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.inadco.rta.sparkjobs.HeatMapProcessor$.processFASClick(HeatMapProcessor.scala:45)
at com.inadco.rta.sparkjobs.HeatMapProcessor$.processFasClickDir(HeatMapProcessor.scala:26)
at com.inadco.rta.sparkjobs.HeatMapSparkJob.start(HeatMapSparkJob.scala:53)
at com.inadco.rta.sparkjobs.InadcoSparkJobsLauncher$$anonfun$1.apply$mcV$sp(InadcoSparkJobsLauncher.scala:115)
at akka.actor.Scheduler$$anon$5.run(Scheduler.scala:79)
at akka.actor.LightArrayRevolverScheduler$$anon$2$$anon$1.run(Scheduler.scala:242)
at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)



Kartik Mathur

unread,
Jun 5, 2015, 7:30:42 PM6/5/15
to spark-conn...@lists.datastax.com
Are you aware of this 
 "scala.ScalaReflectionException: <none> is not a term" 

Russell Spitzer

unread,
Jun 5, 2015, 8:33:24 PM6/5/15
to spark-conn...@lists.datastax.com
Well i'm not particularly sure why you are seeing this but it comes from the Connector making a converter between your type and the C* types needed. When you get <none> is not a term it may be because the information about the class was not properly serialized. A bigger code sample may help
> Workers: 3Cores: 18 Total, 6 UsedMemory: 12.0 GB Total, 7.5 GB Used

Kartik Mathur

unread,
Jun 8, 2015, 1:31:06 PM6/8/15
to spark-conn...@lists.datastax.com
So I am still getting this exception , here is the relevant code -

For testing I am hardcoding values that I want to insert in cassandra like this -
def convertFASClickToTuples(fasClick: FASClick)= {
if(!isEligible(fasClick)){
Seq(
"brnm",
"advid",
"agid",
"brver",
"campid",
"elid",
"elnm",
"eltyp",
"evttyp",
1,
"faid",
fasClick.getIsWireless,
"osnm",
"osver",
"pgid",
"pgord",
"ppid",
"pubid",
10,
10)
}else{
Seq(
"brnm",
"advid",
"agid",
"brver",
"campid",
"elid",
"elnm",
"eltyp",
"evttyp",
1,
"faid",
fasClick.getIsWireless,
"osnm",
"osver",
"pgid",
"pgord",
"ppid",
"pubid",
10,
10)

This is how , I call the above method -


val dataWithNullable = sc.newAPIHadoopFile(f._1, classOf[org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat[Text, BytesWritable]], classOf[Text], classOf[BytesWritable], sc.hadoopConfiguration)
.map{case (k, v) => {
val bytes = v.copyBytes()
try{
convertFASClickToTuples(FASClick.parseFrom(bytes))
//testConvertFASClickToTuples();
}catch{
case t: Throwable => logError("Failed to save data to heat maps table", t) // TODO: handle error
}
}}
val data = dataWithNullable.filter { x => x!= null }

logInfo("printting RDD ....")
data.collect().foreach {x => logInfo("RDD row:" + x.toString())}

try{

data.saveToCassandra("report", cTableName,
SomeColumns(
"brnm".toLowerCase(),
"advid".toLowerCase(),
"agid".toLowerCase(),
"brver".toLowerCase(),
"campid".toLowerCase(),
"elid".toLowerCase(),
"elnm".toLowerCase(),
"eltyp".toLowerCase(),
"evttyp".toLowerCase(),
"evttypno".toLowerCase(),
"faid".toLowerCase(),
"iswl".toLowerCase(),
"osnm".toLowerCase(),
"osver".toLowerCase(),
"pgid".toLowerCase(),
"pgord".toLowerCase(),
"ppid".toLowerCase(),
"pubid".toLowerCase(),
"x".toLowerCase(),
"y".toLowerCase())
)} catch {
case t: Throwable => logError("Failed to insert in heat map table " + cTableName, t)
}

When I try to print first row from RDD , i get this correctly as -
List(brnm, advid, agid, brver, campid, elid, elnm, eltyp, evttyp, 1, faid, true, osnm, osver, pgid, pgord, ppid, pubid, 10, 10)

This is the cassandra table schema -

CREATE TABLE t_heat_map (
brnm text,
advid text,
agid text,
brver text,
campid text,
elid text,
elnm text,
eltyp text,
evttyp text,
evttypno bigint,
faid text,
iswl boolean,
osnm text,
osver text,
pgid text,
pgord text,
ppid text,
pubid text,
x int,
y int,
PRIMARY KEY ((brnm))
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='{"keys":"ALL", "rows_per_partition":"NONE"}' AND
comment='' AND
dclocal_read_repair_chance=0.100000 AND
gc_grace_seconds=864000 AND
read_repair_chance=0.000000 AND
default_time_to_live=0 AND
speculative_retry='99.0PERCENTILE' AND
memtable_flush_period_in_ms=0 AND
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};

Please let me know , if you need more code

Kartik Mathur

unread,
Jun 8, 2015, 6:12:17 PM6/8/15
to spark-conn...@lists.datastax.com
Now even when I remove all columns from table in cassandra and leave cassandra table with  just one column - 
describe table  t_heat_map;

CREATE TABLE t_heat_map (
  brnm text,
  PRIMARY KEY ((brnm))
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='{"keys":"ALL", "rows_per_partition":"NONE"}' AND
  comment='' AND
  dclocal_read_repair_chance=0.100000 AND
  gc_grace_seconds=864000 AND
  read_repair_chance=0.000000 AND
  default_time_to_live=0 AND
  speculative_retry='99.0PERCENTILE' AND
  memtable_flush_period_in_ms=0 AND
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'LZ4Compressor'};

and try to save to cassandra - 
data.saveToCassandra("report", cTableName,
                          SomeColumns( 
                                       "brnm".toLowerCase())


I still get the error - 
<none> is not a term

Not sure what am I missing !!

Please help.

Thanks
Kartik



Russell Spitzer

unread,
Jun 8, 2015, 6:17:24 PM6/8/15
to spark-conn...@lists.datastax.com
Can you try running on your last posted schema

scala> sc.parallelize(1 to 100).map(_.toString).map(new Tuple1(_)).saveToCassandra("test","t_heat_map",SomeColumns("brnm".toLowerCase()))

scala> sc.cassandraTable("test","t_heat_map").collect
res6: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{brnm: 15}, CassandraRow{brnm: 10}, CassandraRow{brnm: 65}, CassandraRow{brnm: 92}, CassandraRow{brnm: 52}, CassandraRow{brnm: 4}, CassandraRow{brnm: 85}, CassandraRow{brnm: 45}, CassandraRow{brnm: 56}, CassandraRow{brnm: 3}, CassandraRow{brnm: 69}, CassandraRow{brnm: 30}, CassandraRow{brnm: 82}, CassandraRow{brnm: 72}, CassandraRow{brnm: 5}, CassandraRow{brnm: 100}, CassandraRow{brnm: 57}, CassandraRow{brnm: 76}, CassandraRow{brnm: 36}, CassandraRow{brnm: 61}, CassandraRow{brnm: 97}, CassandraRow{brnm: 50}, CassandraRow{brnm: 25}, CassandraRow{brnm: 35}, CassandraRow{brnm: 18}, CassandraRow{brnm: 74}, CassandraRow{brnm: 28}, CassandraRow{brnm: 14}, CassandraRow{brnm: 8}, CassandraRow{brnm: 51}, CassandraRow{brnm...

If this doesn't work I'm guessing you have a problem with either a connector-jar version mismatch or the intermideariy class you are using between `data` and `saveToCassandra` is behaving badly. 

Russ

Kartik Mathur

unread,
Jun 8, 2015, 6:42:06 PM6/8/15
to spark-conn...@lists.datastax.com
Yes this works !! , so i guess problem is somewhere in this code - 

hdfsFileFullUrls.foreach ( f => {
       logInfo("Processing file" +f.toString()+ "  to table"+cTableName)
val dataWithNullable = sc.newAPIHadoopFile(f._1, classOf[org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat[Text, BytesWritable]], classOf[Text], classOf[BytesWritable], sc.hadoopConfiguration)
.map{case (k, v) => {
val bytes = v.copyBytes()
            try{
  convertFASClickToTuples(FASClick.parseFrom(bytes)) // --> defined below 
              //testConvertFASClickToTuples();
            }catch{
              case t: Throwable => logError("Failed to save data to heat maps table", t) // TODO: handle error
            }
}}
val data = dataWithNullable.filter { x => x!= null }
      
      logInfo("printting RDD ....")
      data.collect().foreach {x => logInfo("RDD row:" + x.toString())}
      
      try{
        
      //sc.parallelize(1 to 100).map(_.toString).map(new Tuple1(_)).saveToCassandra("report","t_heat_map",SomeColumns("brnm".toLowerCase()))  
 
      data.saveToCassandra("report","t_heat_map",
                          SomeColumns( 
                                       "brnm".toLowerCase()))



def convertFASClickToTuples(fasClick: FASClick)= {
        val br=fasClick.getBrowserName
 
        (br)

}

Unable to figure out what !!

Russell Spitzer

unread,
Jun 8, 2015, 6:45:54 PM6/8/15
to spark-conn...@lists.datastax.com
(br) is not actually a tuple with one element, it's just a single element. So you are passing in a primitive which doesn't have getters or setters for the field you are trying to set hence <none>.

scala> def notToTuple( x: Int ) = { (x) }
notToTuple: (x: Int)Int


To do a 1 element tuple you need to write the constructor explictly
scala> def notToTuple( x: Int ) = { Tuple1(x) }
notToTuple: (x: Int)(Int,)


Russell Spitzer

unread,
Jun 8, 2015, 6:46:24 PM6/8/15
to spark-conn...@lists.datastax.com
Sorry that second example should have been

scala> def toTuple( x: Int ) = { Tuple1(x) }
toTuple: (x: Int)(Int,)

Kartik Mathur

unread,
Jun 8, 2015, 7:07:53 PM6/8/15
to spark-conn...@lists.datastax.com
Thanks  , so I changed my method to - 

def convertFASClickToTuples(fasClick: FASClick)= {
         val br=fasClick.getBrowserName  
        Tuple1(br)
}

Still get the same error !!

Kartik Mathur

unread,
Jun 8, 2015, 8:13:50 PM6/8/15
to spark-conn...@lists.datastax.com
So I figured out the problem was that val br , was not matching with the column name in cassandra , which is brnm , once I change this to brnm it works !!!

Last question , since scala allows only till Tuple(22) , however in my case i have over 32 columns , is there a way to do ?

I tried with Seq but get same exception - "<none> is not a term"

What can be done ?

 
Reply all
Reply to author
Forward
0 new messages