I'm tring to manulating cassandra in spark with spark-cassandra-connector.
When I use the joinWithCassandraTable api, it bombs. :(
I cann't find any detailed examples or docs about java api of spark-cassandra-connector .(almost scala)
Can anyone help me? Thanks a lot.
environment:
java: 1.8.0
spark-cassandra-connector: 1.6.5-s_2.10
spark: 1.6.3-hadoop2.6.0
jsr166e: 1.1.0
cassandra: 3.10
cassandra table :
CREATE TABLE eyas.items (
id bigint PRIMARY KEY,
name text
)
INSERT INTO items JSON '{"id": 123456, "name": "amber"}';
INSERT INTO items JSON '{"id": 654321, "name": "apple"}';
My code:
JavaRDD<Long> keys = jsc.parallelize(Arrays.asList(123456L, 654321L));
CassandraJavaPairRDD<Long, String> result = javaFunctions(keys).joinWithCassandraTable(
"eyas",
"items",
someColumns("name"),
someColumns("id"),
mapRowTo(String.class),
mapToRow(Long.class)
);
Exceptons:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Columns not found in class java.lang.Long: [id]
at scala.Predef$.require(Predef.scala:233)
at com.datastax.spark.connector.mapper.ReflectionColumnMapper.columnMapForWriting(ReflectionColumnMapper.scala:94)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:36)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:27)
at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:17)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.rowWriter(AbstractCassandraJoin.scala:37)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.rowWriter$lzycompute(CassandraJoinRDD.scala:22)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.rowWriter(CassandraJoinRDD.scala:22)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.checkValidJoin(AbstractCassandraJoin.scala:66)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.checkValidJoin(CassandraJoinRDD.scala:22)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin$class.getPartitions(AbstractCassandraJoin.scala:167)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.getPartitions(CassandraJoinRDD.scala:22)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:339)
at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:46)
at Main.main(Main.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
You need to wrap the Long key in a a Tuple1 or make a class with the field id:long. The connector doesn't map primitives.
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.