ColumnSelector backward compatibility

32 views
Skip to first unread message

Steven Vch.

unread,
Aug 11, 2023, 1:11:17 PM8/11/23
to DataStax Spark Connector for Apache Cassandra
Old schema:
CREATE TYPE test.address (city text, street text);
CREATE TABLE test.companies (name text PRIMARY KEY, address FROZEN<address>);

Update cassandra schema to:
CREATE TYPE test.address (city text, street text, number int);
CREATE TABLE test.companies (name text PRIMARY KEY, address FROZEN<address>);

JavaRDD<Company> companiesRdd = ...
CassandraJavaUtil.<Company>javaFunctions(companiesRdd).writerBuilder(keyspace, table, CassandraJavaUtil.mapToRow(Company.class)).saveToCassandra();



If run on old version our Company class with fields city, street we got error:

        at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:61)
        at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:486)
        at com.peterservice.gus.spark.daily_aggregates.SparkDailyAggregatesTask.saveToCassandra(SparkDailyAggregatesTask.java:218)
        at com.peterservice.gus.spark.daily_aggregates.SparkDailyAggregatesTask.run(SparkDailyAggregatesTask.java:85)
        at com.peterservice.gus.spark.daily_aggregates.DailyAggregatesRunner.main(DailyAggregatesRunner.java:88)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalArgumentException: requirement failed: Columns not found in class Company: [number]


Error produce because writerBuilder provide by default "public static final ColumnSelector allColumns = AllColumns$.MODULE$;"

How i can implement ".writerBuilder(...).withColumnSelector()  <---- here" for avoid SELECT ALL COLUMN FROM DEFINITIONS  ? Or how fix this error ?
Reply all
Reply to author
Forward
0 new messages