DSE Direct Join with Pyspark Dataframes

158 views
Skip to first unread message

Ebad Ahmadzadeh

unread,
May 11, 2020, 8:03:28 PM5/11/20
to DataStax Spark Connector for Apache Cassandra

** Was directed to this mail list by Russel. He's been very helpful, but my question is getting to debugging stage. So I'm posting here and would appreciate your taking time and help in advance.

My ultimate goal is to read micro batches coming from Kafka to spark structured streaming dataframe, enrich it with some additional columns from a large table in Cassandra and then write the results in a different table in Cassandra.
The issue is that the physical plan does not show an evidence of Direct Join being happening.

So I tried to simplify the setup:
Now I'm reading from a smaller Cassandra table into a dataframe. Then reading a bigger Cassandra table, then performing a join and looking at the plans:

Table Definitions:


create table IF NOT EXISTS crm.opportunity
(
    env_name text
,
    id text
,
    account_id text
,
    name text
,
    probability
double,
   primary key
(id, env_name)
);
// has about 100K records

create table IF NOT EXISTS crm.account
(
    env_name text
,
    id text
,
    industry text
,
    number_of_employees bigint
,
   primary key
(id, env_name)
);
// has about 2k records



Here is how my code looks like:

from pyspark.sql import SQLContext, HiveContext, SparkSession
from pyspark.sql.functions import lit, col
spark = SparkSession.builder.appName("DirectJoinTest").getOrCreate()
acc_df = spark.read.format(
    "org.apache.spark.sql.cassandra"
).options(
    keyspace="crm", table="account"
).load().select(
    "env_name", col("id").alias("account_id"), "industry", "number_of_employees"
)
opp_df = spark.read.format(
    "org.apache.spark.sql.cassandra"
).options(
    keyspace="crm", table="opportunity"
).load().select(
    "env_name", "account_id", "id", "name"
)
opp_df.join(
    acc_df,
         ["account_id", "env_name"]
).explain()

Here is the query plan:

== Physical Plan ==
*(2) BroadcastHashJoin [account_id#36, env_name#35], [account_id#28, env_name#1], Inner, BuildRight
:- *(2) Filter isnotnull(account_id#36)
:  +- *(2) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [env_name#35,account_id#36,id#34,name#50] PushedFilters: [IsNotNull(account_id)], ReadSchema: struct<env_name:string,account_id:string,id:string,name:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false], input[0, string, false]))
   +- *(1) Project [env_name#1, id#0 AS account_id#28, industry#4, number_of_employees#10L, annual_revenue#2]
      +- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [env_name#1,number_of_employees#10L,id#0,industry#4,annual_revenue#2] PushedFilters: [], ReadSchema: struct<env_name:string,number_of_employees:bigint,id:string,industry:string,annual_revenue:double>
Enter code here...

I would expect the account table to be filtered based on the "account_id" and "env_name" of the ~2K records in the opportunity table, and then the left join to be calculated.However, based on the last line, it seems to me that a full-scan is happening.

Here is some additional info about my setup:
  • spark 2.4.4 running on AWS EMR
  • BYOS DSE 6.7.7 jar included in spark-submit
Any suggestions would be really appreciated.
Thanks
Ebad

Ebad Ahmadzadeh

unread,
May 13, 2020, 3:45:29 PM5/13/20
to Alex Ott, Russell Spitzer, DataStax Spark Connector for Apache Cassandra
Alex, just to confirm: I was able to make Pyspark (2.4.4) utilize the DSE functionality by using SCC 2.5.0. So that's great!
But either way, due to the Spark bug (SPARK-25003) the extension should be added the way I shared in the code above. However, the extension names are different as stated by Alex in the first response.

Thank you again Russel and Alex for your help and responsiveness.

On Wed, May 13, 2020 at 12:56 PM Alex Ott <ale...@gmail.com> wrote:
Thank you very much!

On Wed, May 13, 2020 at 6:49 PM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
Absolutely.
  • OSS Spark version: 2.2.1
  • Pyspark version: 2.2.1
  • Python 3.4.7
  • BYOS JAR file included with spark-submit - version: dse-6.7.7 (scala 2.11)
# Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
            .appName("DirectJoinTest") \
            .config("spark.sql.extensions", "com.datastax.spark.connector.DseSparkExtensions") \
            .getOrCreate()

# Apache Spark 2.4.x has a known issue (SPARK-25003) that requires explicit activation
# of the extension and cloning of the session. This is unnecessary in Apache Spark 3.x.
if spark.sparkContext.version < "3.":
    spark.sparkContext._jvm.com.datastax.spark.connector.DseSparkExtensions().apply(spark._jsparkSession.extensions())
    spark = SparkSession(spark.sparkContext, spark._jsparkSession.cloneSession())

# Now test DirectJoin
# the large Cassandra table
acc_df = spark.read.format(
    "org.apache.spark.sql.cassandra"
).options(
    keyspace="crm", table="account"
).load().select(
    "env_name", col("id").alias("account_id"), "industry", "number_of_employees", "annual_revenue"
)

# The small Cassandra table:
opp_df = spark.read.format(
    "org.apache.spark.sql.cassandra"
).options(
    keyspace="crm", table="opportunity"
).load().select(
    "env_name", "account_id", "id", "name"
).join(

    acc_df,
    ["account_id", "env_name"]
).explain()

== Physical Plan ==
*Project [account_id#38, env_name#37, id#36, name#52, industry#4, number_of_employees#10L, annual_revenue#2]
+- *Project [env_name#37, account_id#38, id#36, name#52, env_name#1, id#0 AS account_id#29, industry#4, number_of_employees#10L, annual_revenue#2]
   +- DSE Direct Join [id = account_id#38, env_name = env_name#37] crm.account - Reading (id, env_name, annual_revenue, number_of_employees, industry) Pushed {}
      +- *Filter isnotnull(account_id#38)
         +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [env_name#37,account_id#38,id#36,name#52] PushedFilters: [IsNotNull(account_id)], ReadSchema: struct<env_name:string,account_id:string,id:string,name:string>




On Wed, May 13, 2020 at 12:36 PM Alex Ott <ale...@gmail.com> wrote:
Can you share your setup, so it could be reused by other? :-)

On Wed, May 13, 2020 at 6:24 PM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
OMG It worked!

opp_df = spark.read.format(
...     "org.apache.spark.sql.cassandra"
... ).options(
...     keyspace="crm", table="opportunity"
... ).load().select(
...     "env_name", "account_id", "id", "name"
... ).join(
...     acc_df,
...     ["account_id", "env_name"]
... ).explain()
== Physical Plan ==
*Project [account_id#38, env_name#37, id#36, name#52, industry#4, number_of_employees#10L, annual_revenue#2]
+- *Project [env_name#37, account_id#38, id#36, name#52, env_name#1, id#0 AS account_id#29, industry#4, number_of_employees#10L, annual_revenue#2]
   +- DSE Direct Join [id = account_id#38, env_name = env_name#37] crm.account - Reading (id, env_name, annual_revenue, number_of_employees, industry) Pushed {}
      +- *Filter isnotnull(account_id#38)
         +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [env_name#37,account_id#38,id#36,name#52] PushedFilters: [IsNotNull(account_id)], ReadSchema: struct<env_name:string,account_id:string,id:string,name:string>

On Wed, May 13, 2020 at 12:13 PM Alex Ott <ale...@gmail.com> wrote:
Yes, Spark 2.3 also has that problem with BYOS < 6.8 - I'm doing a lot of BYOS, and saw that (I really wrote that in my first email :-)
If you don't need functionality, like, DSEFS, why not use Spark Cassandra Connector 2.5? It works with Spark 2.4 just fine.

On Wed, May 13, 2020 at 6:08 PM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
That makes sense. Thanks for the tip!
At this point I'm using spark 2.3.2. But I think what you said still holds.


On Wed, May 13, 2020 at 11:58 AM Russell Spitzer <russell...@gmail.com> wrote:
This is actually a Spark Version mismatch, the connector is attempting to call the Stats method from LogicalPlan present in Spark 2.2 while I assume you are using Spark 2.4.  You'll need the BYOS library from dse 6.8 to work with Spark 2.4 (This will still be compatible with earlier versions of DSE and C* +- some graph issues I believe)



On Wed, May 13, 2020 at 10:48 AM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
I think I made some progress by adding the extension to spark context directly..
Now when I run the explain() method I get an error that is related to DSE and I notice it actually is trying to make a decision about the join.


>>> opp_df = spark.read.format(
...     "org.apache.spark.sql.cassandra"
... ).options(
...     keyspace="crm", table="opportunity"
... ).load().select(
...     "env_name", "account_id", "id", "name"
... ).join(
...     acc_df,
...     ["account_id", "env_name"]
... ).explain()
Traceback (most recent call last):
  File "<stdin>", line 9, in <module>
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 291, in explain
    print(self._jdf.queryExecution().simpleString())
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o132.simpleString.
: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(Lorg/apache/spark/sql/internal/SQLConf;)Lorg/apache/spark/sql/catalyst/plans/logical/Statistics;
at org.apache.spark.sql.cassandra.execution.DSEDirectJoinStrategy.checkSizeRatio(DSEDirectJoinStrategy.scala:109)
at org.apache.spark.sql.cassandra.execution.DSEDirectJoinStrategy.rightValid(DSEDirectJoinStrategy.scala:141)
at org.apache.spark.sql.cassandra.execution.DSEDirectJoinStrategy.hasValidDirectJoin(DSEDirectJoinStrategy.scala:87)
at org.apache.spark.sql.cassandra.execution.DSEDirectJoinStrategy.apply(DSEDirectJoinStrategy.scala:30)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:189)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:189)
at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:189)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

On Wed, May 13, 2020 at 11:33 AM Russell Spitzer <russell...@gmail.com> wrote:
It does require Dse to be installed as well. It relies on C* from leader election. If that's not possible for you, you could always cherry pick the changes from SPARK-25003 onto 2.4 and build an artifact from that yourself

On Wed, May 13, 2020 at 10:28 AM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
Ok, thanks for the clarification and the suggestions.
Is there an easy way to install the DSE Spark standalone without installing the whole DSE?



On Wed, May 13, 2020 at 11:22 AM Russell Spitzer <russell...@gmail.com> wrote:
If that wasn't clear, the DSE Spark 2.4 has this patch, but OSS Spark 2.4 does not.

On Wed, May 13, 2020 at 10:20 AM Russell Spitzer <russell...@gmail.com> wrote:
If you check out the jira, the bug is present on all versions of Spark until 3.0. So no amount of downgrading can help you. The only solution (the one we did internally) was to patch spark and use a forked version. You maybe be able to also just monkey patch the python code to correctly apply the extensions.

On Wed, May 13, 2020 at 10:18 AM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
interesting. Were you able to get it to work by doing that on your end?

I found a solution to add the extensions this way, but I get this error.
Initially I was trying it on Spark 2.4.4, so downgraded to 2.3.2 but the issue remains.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.2
      /_/

Using Python version 3.4.10 (default, Nov  7 2019 21:20:33)
SparkSession available as 'spark'.
>>> from pyspark.sql import SparkSession
>>>
>>> spark = SparkSession.builder \
...             .appName("DirectJoinTest") \
...             .config("spark.sql.extensions", "com.datastax.spark.connector.DseSparkExtensions") \
...             .enableHiveSupport() \
...             .getOrCreate()
>>>
>>> spark.sparkContext.version
'2.3.2'
>>> spark.sparkContext._jvm.com.datastax.spark.connector.DseSparkExtensions.apply(spark._jsparkSession.extensions())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1487, in __getattr__
py4j.protocol.Py4JError: com.datastax.spark.connector.DseSparkExtensions.apply does not exist in the JVM
>>>




On Tue, May 12, 2020 at 10:00 PM Russell Spitzer <russell...@gmail.com> wrote:
I totally forgot this is a bug in OSS Spark. Which I not only discovered but also patched https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25003

The only way to add the extensions is gonna be through the spark context directly

On Tue, May 12, 2020, 11:26 AM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
Thanks for your taking time, Russel and Alex.
Please keep me posted.

Thanks
Ebad


On Tue, May 12, 2020 at 12:20 PM Russell Spitzer <russell...@gmail.com> wrote:
We are seeing similar issues on python, i'm going to need to check into this with a debugger since it looks like the rules are not actually being applied at all

On Tue, May 12, 2020 at 11:18 AM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
Attached is the raw logs (DEBUG level) I get by running my example.
Please let me know if that is too verbose.

On Tue, May 12, 2020 at 10:30 AM Alex Ott <ale...@gmail.com> wrote:
it's interesting difference between pyspark & spark-shell:

In pyspark I don't see the direct join in 2.5.0:

>>> toJoin = spark.range(1, 100).selectExpr("cast(id as int) as id")
>>> dataset = spark.read.format("org.apache.spark.sql.cassandra").options(keyspace="test", table="jtest").load()
>>> joined = toJoin.join(dataset, toJoin.id == dataset.id)
>>> joined.explain()
== Physical Plan ==
*(2) BroadcastHashJoin [id#2], [id#4], Inner, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
:  +- *(1) Project [cast(id#0L as int) AS id#2]
:     +- *(1) Range (1, 100, step=1, splits=8)
+- *(2) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id#4,t#5] PushedFilters: [], ReadSchema: struct<id:int,t:string>

command line was:

bin/pyspark --properties-file ../ac.properties --repositories file:/Users/ott/.m2/repository --packages com.datastax.spark:spark-cassandra-connector_2.12:2.5.0

and ac.properties contains

spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

but when i start spark-shell with almost the same command-line:

bin/spark-shell --properties-file ../ac.properties --repositories file:/Users/ott/.m2/repository --packages com.datastax.spark:spark-cassandra-connector_2.12:2.5.0

i get direct join:

val toJoin = spark.range(1, 1000).map(x => x.intValue).withColumnRenamed("value", "id")

val dataset = spark.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "jtest", "keyspace" -> "test"))
  .load
val joined = toJoin.join(dataset, dataset("id") === toJoin("id"))
joined.explain

== Physical Plan ==
Cassandra Direct Join [id = id#7] test.jtest - Reading (id, t) Pushed {}
+- *(1) Project [value#5 AS id#7]
   +- *(1) SerializeFromObject [input[0, int, false] AS value#5]
      +- *(1) MapElements $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$1930/1575588264@6e0c5743, obj#4: int
         +- *(1) DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false), obj#3: java.lang.Long
            +- *(1) Range (1, 1000, step=1, splits=8)






On Tue, May 12, 2020 at 4:26 PM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
Ok, thanks.
By the way, my last message was bounced back, but it sounds like it actually was delivered (weird).


On Tue, May 12, 2020 at 10:23 AM Russell Spitzer <russell...@gmail.com> wrote:
The parameter is not set by default, it just a default value if not set.

On Tue, May 12, 2020 at 9:19 AM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
I received both emails, but thanks for resending.

Yes, that makes a lot of sense for jar files.

I'm going to work on the debugging and will let you know once I have that information.
In the meantime, I also noticed that I cannot get spark cassandra configurations from spark session. For example:

>>> spark.conf.get("directJoinSizeRatio")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/conf.py", line 51, in get
    return self._jconf.get(key)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o73.get.
: java.util.NoSuchElementException: directJoinSizeRatio
at org.apache.spark.sql.internal.SQLConf$$anonfun$getConfString$2.apply(SQLConf.scala:2186)
at org.apache.spark.sql.internal.SQLConf$$anonfun$getConfString$2.apply(SQLConf.scala:2186)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.internal.SQLConf.getConfString(SQLConf.scala:2186)
at org.apache.spark.sql.RuntimeConfig.get(RuntimeConfig.scala:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

On Tue, May 12, 2020 at 10:14 AM Russell Spitzer <russell...@gmail.com> wrote:
Yeah the ordering is important because it looks like
pyspark (options for pyspark) jar  (options for the jar)

But this is only important for a "jar" arg and not for --jars so I don't think that would be the issue here ...

Next! We need to take a deeper look into the optimization, 

Lets turn on DEBUG or TRACE level logging for
org.apache.spark.sql.cassandra.execution
I'm not sure what logging tech you are using but it would involve modifying your log4j or logback.xml files depending on your spark installation

This will enable logging for the Strategy and we can see whether it is being applied

-- Resending because my last email got blocked as spam

On Tue, May 12, 2020 at 9:10 AM Russell Spitzer <russell...@gmail.com> wrote:
Yeah the ordering is important because it looks like
pyspark (options for pyspark) jar  (options for the jar)

But this is only important for a "jar" arg and not for --jars so I don't think that would be the issue here ...

Next! We need to take a deeper look into the optimization, 

Lets turn on DEBUG or TRACE level logging for
org.apache.spark.sql.cassandra.execution
I'm not sure what logging tech you are using but it would involve modifying your log4j or logback.xml files depending on your spark installation

This will enable logging for the Strategy and we can see whether it is being applied

On Tue, May 12, 2020 at 8:53 AM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
I didn't know the ordering was important! Thanks for that tip.

So should be running this way:
$ pyspark --conf spark.sql.extensions=com.datastax.spark.connector.DseSparkExtensions --jars /home/hadoop/ebad/dse-byos_2.11-6.7.7.jar

It's not working for me yet.

Thanks
Ebad



On Tue, May 12, 2020 at 9:36 AM Alex Ott <ale...@gmail.com> wrote:
all options should be specified before jar file name...

On Tue, May 12, 2020 at 3:33 PM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
Thanks, Russel. I must be missing something besides the conf. It hasn't given me the direct join yet.

Here is how I'm running it in shell:
$ pyspark --jars /home/hadoop/ebad/dse-byos_2.11-6.7.7.jar --conf spark.sql.extensions=com.datastax.spark.connector.DseSparkExtensions

I also verify the setting has been applied this way:
>> spark.conf.get("spark.sql.extensions")
# which returns:
'com.datastax.spark.connector.DseSparkExtensions'

But the example code that I shared still produces the same plans.

Could it be because I'm using spark 2.4.4 with DSE 6.7.7?
Or is it the case that I have to generate and include the byos properties file as well?

Thanks
Ebad





On Tue, May 12, 2020 at 8:53 AM Russell Spitzer <russell...@gmail.com> wrote:
You can set it on the command line with --conf or by setting it in your spark conf object when configuring your app, or in your spark defaults file.

On Tue, May 12, 2020, 7:44 AM Ebad Ahmadzadeh <mahmadza...@my.fit.edu> wrote:
Hi Alex,

Thank you very much for your response and detailed information. I'm very excited to see the query plan you pasted in my console.

Since I'm trying to achieve this with Pyspark, I'm wondering how this part can be done in pyspark:
spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

Any hints would be appreciated.

Thanks again.
Ebad


On Tue, May 12, 2020 at 2:19 AM Alex Ott <ale...@gmail.com> wrote:
Hi Ebad

This functionality isn't enabled by default when you're using BYOS - it's enabled by the special property that is enabled by default only on DSE Analytics. To get this functionality you need to pass following property at start of the job:

spark.sql.extensions=com.datastax.spark.connector.DseSparkExtensions

Besides of it, there are other properties that are useful when using with BYOS - you can export them from DSE node (once) using following command:

dse client-tool --use-server-config configuration byos-export ~/byos-file-name.properties

But be very careful, as 6.7.7 BYOS is compiled against Spark 2.2, and not all optimizations may work correctly with Spark 2.4.

If it won't work, you can try Spark Cassandra Connector 2.5.0 where you can enable this functionality with following configuration parameter:

spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions


== Physical Plan ==
Cassandra Direct Join [id = id#7] test.jtest - Reading (id, t) Pushed {}
+- *(1) Project [value#5 AS id#7]
   +- *(1) SerializeFromObject [input[0, int, false] AS value#5]
      +- *(1) MapElements <function1>, obj#4: int
         +- *(1) DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#0L, true, false), obj#3: java.lang.Long
            +- *(1) Range (1, 1000, step=1, splits=8)



--
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


--
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

--
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

--
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


--
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)


--
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

--
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.


--
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)


--
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)


--
With best wishes,                    Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)
Reply all
Reply to author
Forward
0 new messages