Trying to run Cassandra Connector v2 in sparkR

119 views
Skip to first unread message

Harrison T

unread,
Jan 6, 2021, 7:20:19 PM1/6/21
to DataStax Spark Connector for Apache Cassandra
Hi all,
I'm trying to follow along the steps in https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md but for sparkR instead of spark-shell. Could anyone point me to a document that shows the method calls for sparkR instead of spark-shell? They don't seem to be quite the same. I ran sparkR the same way spark-shell is shown to run, just with sparkR instead of spark-shell. I verified that I could follow the steps with spark-shell.

The steps I've done:
1. ./bin/sparkR --conf spark.cassandra.connection.host=127.0.0.1                             --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
2. sparkR.conf("spark.sql.catalog.mycatalog", "com.datastax.spark.connector.datasource.CassandraCatalog") (found that this is different from the spark-shell, but it seems to have worked)
3. various combinations of sparkR.sql(<>), spark.sql(<>), sql(<>). either get missing function errors or "  org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException: Namespace 'mycatalog' not found;"

Harrison

Alex Ott

unread,
Jan 7, 2021, 4:03:56 AM1/7/21
to DataStax Spark Connector for Apache Cassandra
Hi

sparkR.conf just returns the actual configuration value, using the 2nd argument as default if configuration isn't set.  this call doesn't change configuration.

Changes in configuration are done via call to sparkR.session, and passing the configuration properties in sparkConfig parameter.  Here is working solution:

> sparkR.session(sparkConfig = list(spark.sql.catalog.cass="com.datastax.spark.connector.datasource.CassandraCatalog", spark.sql.catalog.cass.spark.cassandra.connection.host="127.0.0.1"))
Java ref type org.apache.spark.sql.SparkSession id 1
>
> df <- sql("select * from cass.test.t1")
> head(df)
       id       v
1  238518  238518
2  754220  754220
3 1366178 1366178
4 1407128 1407128
5 1912212 1912212
6 1818104 1818104
>

--
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


--
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

Harrison T

unread,
Jan 7, 2021, 1:26:12 PM1/7/21
to spark-conn...@lists.datastax.com
Hi Alex, 

Thank you very much for your help! I was able to get started. I've opened a PR to the project with a documentation update detailing the steps you told me as well as some additional translations from scala to R. I'm unsure about some of the Scala DSL steps though. Is there an analog for those steps in R or is the only option using the `sql()` function?


Harrison 

To unsubscribe from this topic, visit https://groups.google.com/a/lists.datastax.com/d/topic/spark-connector-user/KWmLrs1nvBM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to spark-connector-...@lists.datastax.com.

Alex Ott

unread,
Jan 7, 2021, 3:30:23 PM1/7/21
to spark-conn...@lists.datastax.com
Hi

I can look tomorrow, or at least over weekend ... really I had a todo item for R for a long time, I just need to get my notes. I’ll comment on your PR

Harrison T

unread,
Jan 11, 2021, 12:23:37 PM1/11/21
to spark-conn...@lists.datastax.com
Hi Alex, 
First of all, my mistake in continuing this conversation on the pull request, I got confused by the email titles! I was able to reproduce the issue I mentioned in a new file which just creates a keyspace, creates 2 tables, loads data into the tables, reads into 2 dataframes, regexp_replace, and then attempts a join. The only change that will need to be made is to edit the SPARK_HOME variable on line 5. 

The last line - head(joinRes) - throws the error I mentioned. 

For context, here is the previous post where I mention my issue: https://github.com/datastax/spark-cassandra-connector/pull/1295#issuecomment-757035473

library(sets)
# Set the environment (needed when running: "dse exec rstudio" on a standalone machine or on one of the Spark nodes)
# But not needed when running on-node in a "dse sparkR" environment
# https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/thirdPartyToolsIntegrationSpark.html
Sys.setenv(SPARK_HOME = "<removed for privacy>")
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
# library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
# sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
sparkR.session(master = "local[*]", sparkConfig=list(
  spark.sql.catalog.mycatalog="com.datastax.spark.connector.datasource.CassandraCatalog",
  spark.sql.catalog.mycatalog.spark.cassandra.connection.host="127.0.0.1",
  spark.cassandra.connection.host="127.0.0.1",
  spark.sql.extensions="com.datastax.spark.connector.CassandraSparkExtensions"
),
sparkPackages = "com.datastax.spark:spark-cassandra-connector_2.12:3.0.0"
)

sql("CREATE DATABASE IF NOT EXISTS mycatalog.testks WITH DBPROPERTIES (class='SimpleStrategy',replication_factor='1')")
sql("CREATE TABLE IF NOT EXISTS mycatalog.testks.testtable1 (key1 STRING, value1 STRING) USING cassandra PARTITIONED BY (key1)")
sql("CREATE TABLE IF NOT EXISTS mycatalog.testks.testtable2 (key2 STRING, value2 STRING) USING cassandra PARTITIONED BY (key2)")


dataFrame1 <- createDataFrame(data = list(tuple("id1234", "foo"), tuple("id5678", "bar")), schema = list("key1", "value1"))
dataFrame2 <- createDataFrame(data = list(tuple("id1234#hidden", "hello"), tuple("id5678#hidden", "world")), schema = list("key2", "value2"))

write.df(dataFrame1, source = "org.apache.spark.sql.cassandra", keyspace = "testks", table = "testtable1", mode = "append")
write.df(dataFrame2, source = "org.apache.spark.sql.cassandra", keyspace = "testks", table = "testtable2", mode = "append")

#table1res <- sql("SELECT key1, value1 FROM mycatalog.testks.testtable1")
#table2res <- sql("SELECT key2, value2 FROM mycatalog.testks.testtable2")
table1res <- read.df(source = "org.apache.spark.sql.cassandra", keyspace = "testks", table = "testtable1")
table2res <- read.df(source = "org.apache.spark.sql.cassandra", keyspace = "testks", table = "testtable2")

table2res$key2 <- regexp_replace(table2res$key2, "#hidden", "")

joinRes <- join(table1res, table2res, table1res$key1==table2res$key2)

head(joinRes)           
                                                

Harrison T

unread,
Jan 12, 2021, 5:26:03 PM1/12/21
to spark-conn...@lists.datastax.com
I decided to try the same data queries and manipulations in python and am getting the same error. I'm fairly confident my usage of the `regexp_replace` method matches the usage in the pyspark documentation. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html (regexp_replace)

Alex Ott

unread,
Jan 17, 2021, 1:00:22 PM1/17/21
to DataStax Spark Connector for Apache Cassandra
Hello

Unfortunately I'm using R very rarely, and will need time to reproduce the issue to understand the source of the problem

Harrison T

unread,
Jan 18, 2021, 12:16:51 AM1/18/21
to spark-conn...@lists.datastax.com
Alex, thank you for your reply. The same error happens when I try to do the join in Python using pyspark. I will construct a self-contained example tomorrow and include it as a reply with reproduction steps, if python is easier for you?

Harrison

Harrison T

unread,
Jan 18, 2021, 1:08:20 PM1/18/21
to spark-conn...@lists.datastax.com
Here is a Jupyter notebook that creates a keyspace and tables, loads data, queries the data, does the `regexp_replace` transform, and then attempts a join. I've also included a block that does a transform and a join without using Cassandra and it works. 

I run pyspark using the command "pyspark --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
I also set the environment variables: "export PYSPARK_DRIVER_PYTHON=jupyter export PYSPARK_DRIVER_PYTHON_OPTS='notebook'" so that my Jupyter notebook has access to the packages and pyspark libraries. 

I'm running with opensource Cassandra 3.11.9, spark-cassandra-connector "spark-cassandra-connector_2.12:3.0.0" and Spark "spark-3.0.1-bin-hadoop3.2"

Harrison
pyspark-cassandra-connector-join-error.ipynb

Alex Ott

unread,
Jan 20, 2021, 1:55:12 PM1/20/21
to DataStax Spark Connector for Apache Cassandra
I'll look to it, most probably over the weekend

Harrison T

unread,
Jan 22, 2021, 3:23:56 PM1/22/21
to spark-conn...@lists.datastax.com
I opened a ticket for this issue on the Jira. It should have all the information that we've discussed in this email thread. https://datastax-oss.atlassian.net/jira/software/c/projects/SPARKC/issues/SPARKC-639

Harrison
Reply all
Reply to author
Forward
0 new messages