Direct joins don't work with keyspaces that start with a digit

53 views
Skip to first unread message

Igor Markiewicz

unread,
Oct 22, 2021, 10:15:45 AM10/22/21
to DataStax Spark Connector for Apache Cassandra
When a Cassandra kespace starts with a digit e.g. 200_some_keyspace, direct joins throw

no viable alternative at input '200' (..., "col_a", "col_b", "col_c" FROM [200]...)

1. I tried to use back quotes for the keyspace name, but with no luck.
2 . I don't see this issue when I read from Cassandra, write to Cassandra and use Spark native joins.

Alex Ott

unread,
Oct 22, 2021, 1:09:34 PM10/22/21
to DataStax Spark Connector for Apache Cassandra
I think that it happens before it reaches Cassandra connector. Please post full stacktrace

--
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


--
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

Igor Markiewicz

unread,
Oct 23, 2021, 8:51:27 AM10/23/21
to DataStax Spark Connector for Apache Cassandra, ale...@gmail.com
DROP KEYSPACE IF EXISTS "200_source_keyspace";
CREATE KEYSPACE "200_source_keyspace" with replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '1'};

CREATE TABLE IF NOT EXISTS "200_source_keyspace".source_table_1(
id text PRIMARY KEY
);

CREATE TABLE IF NOT EXISTS "200_source_keyspace".source_table_2(
id text PRIMARY KEY
);

INSERT INTO "200_source_keyspace".source_table_1 (id) VALUES ('A');
INSERT INTO "200_source_keyspace".source_table_2 (id) VALUES ('A');
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
val sparkConfig = new SparkConf(true)
.set("spark.sql.catalog.mycassandra", "com.datastax.spark.connector.datasource.CassandraCatalog")
.set("spark.sql.catalog.mycassandra.spark.cassandra.connection.host", "127.0.0.1")
.set("spark.sql.catalog.mycassandra.spark.cassandra.connection.port", "9142")
.set("spark.sql.catalog.mycassandra.directJoinSetting", "on")
.set("spark.sql.catalog.mycassandra.spark.cassandra.input.consistency.level", "LOCAL_QUORUM")

implicit val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Test")
.withExtensions(new CassandraSparkExtensions)
.config(sparkConfig)
.getOrCreate()

val keyspace = "200_source_keyspace"
val tab1Name = "source_table_1"
val tab2Name = "source_table_2"
val tab1 = sparkSession.read.table(s"mycassandra.$keyspace.$tab1Name")
val tab2 = sparkSession.read.table(s"mycassandra.$keyspace.$tab2Name")

tab1.join(tab1, "id").collect()
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage 0.0 (TID 5) (192.168.1.100 executor driver): com.datastax.oss.driver.api.core.servererrors.SyntaxError: line 1:17 no viable alternative at input '200' (SELECT "id" FROM [200]...)
at com.datastax.oss.driver.api.core.servererrors.SyntaxError.copy(SyntaxError.java:48)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
at com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor.process(CqlPrepareSyncProcessor.java:59)
at com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor.process(CqlPrepareSyncProcessor.java:31)
at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
at com.datastax.oss.driver.api.core.cql.SyncCqlSession.prepare(SyncCqlSession.java:206)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:43)
at com.sun.proxy.$Proxy24.prepare(Unknown Source)
at com.datastax.spark.connector.datasource.JoinHelper$.getJoinPreparedStatement(JoinHelper.scala:81)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin.compute(AbstractCassandraJoin.scala:105)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin.compute$(AbstractCassandraJoin.scala:102)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.compute(CassandraJoinRDD.scala:27)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2965)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2965)
at com.test.phase.Main$.run(Main.scala:41)
at com.test.MainTest.$anonfun$new$2(MainTest.scala:77)
at com.test.MainTest.$anonfun$new$2$adapted(MainTest.scala:76)
at com.test.EmbeddedCassandraTest.<init>(EmbeddedCassandraTest.scala:37)
at com.test.MainTest.$anonfun$new$1(MainTest.scala:76)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1812)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1685)
at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1810)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1822)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1822)
at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1804)
at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1685)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1880)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1880)
at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1879)
at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1685)
at org.scalatest.Suite.run(Suite.scala:1112)
at org.scalatest.Suite.run$(Suite.scala:1094)
at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1685)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1925)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1925)
at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1923)
at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1685)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1322)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1316)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1316)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1482)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
Caused by: com.datastax.oss.driver.api.core.servererrors.SyntaxError: line 1:17 no viable alternative at input '200' (SELECT "id" FROM [200]...)
at com.datastax.oss.driver.api.core.servererrors.SyntaxError.copy(SyntaxError.java:48)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
at com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor.process(CqlPrepareSyncProcessor.java:59)
at com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor.process(CqlPrepareSyncProcessor.java:31)
at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
at com.datastax.oss.driver.api.core.cql.SyncCqlSession.prepare(SyncCqlSession.java:206)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:43)
at com.sun.proxy.$Proxy24.prepare(Unknown Source)
at com.datastax.spark.connector.datasource.JoinHelper$.getJoinPreparedStatement(JoinHelper.scala:81)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin.compute(AbstractCassandraJoin.scala:105)
at com.datastax.spark.connector.rdd.AbstractCassandraJoin.compute$(AbstractCassandraJoin.scala:102)
at com.datastax.spark.connector.rdd.CassandraJoinRDD.compute(CassandraJoinRDD.scala:27)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Alex Ott

unread,
Oct 23, 2021, 1:56:39 PM10/23/21
to Igor Markiewicz, DataStax Spark Connector for Apache Cassandra
Yep, looks like incorrect quoting of identifiers. I recommend to create a JIRA for it at https://datastax-oss.atlassian.net/

Igor Markiewicz

unread,
Oct 23, 2021, 4:09:25 PM10/23/21
to DataStax Spark Connector for Apache Cassandra, ale...@gmail.com, DataStax Spark Connector for Apache Cassandra, Igor Markiewicz
Thank you for your help. I created a ticket
Reply all
Reply to author
Forward
0 new messages