The python script is as below:
==================== script =======================
from pyspark import SparkContext, SparkConf
from pyspark_cassandra import CassandraSparkContext,Row
from datetime import datetime, date, time, timedelta
from random import *
conf = SparkConf()
conf.setMaster("local[4]")
conf.setAppName("My Spark Cassandra")
conf.set("spark.cassandra.connection.host","172.21.4.2")
conf.set("spark.cassandra.keyspace", "myspace")
sc = CassandraSparkContext(conf=conf)
rdd = sc.parallelize([{"key": k,"stamp": datetime.now(),"val": random() * 10,"tags": ["a", "b", "c"],"options": {"foo": "bar","baz": "qux"}} for k in ["x", "y", "z"]])
rdd.saveToCassandra("myspace","mytable", ttl=timedelta(hours=1))
However, when I run submit, it complains cannot find 'myspace' and 'mytable' as below, both are already created in cassandra and inserted one test row. Really have no idea what's wrong, please help me out if somebody can.
================ spark-submit ====================
[mapr@maprdemo bin]$ ./spark-submit --packages anguenot:pyspark-cassandra:0.6.0 ./sparkcassandra.py
Ivy Default Cache set to: /home/mapr/.ivy2/cache
The jars for the packages stored in: /home/mapr/.ivy2/jars
:: loading settings :: url = jar:file:/opt/mapr/spark/spark-2.1.0/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
anguenot#pyspark-cassandra added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found anguenot#pyspark-cassandra;0.6.0 in spark-packages
found com.datastax.spark#spark-cassandra-connector_2.11;2.0.5 in central
found joda-time#joda-time;2.3 in central
found commons-beanutils#commons-beanutils;1.9.3 in central
found commons-collections#commons-collections;3.2.2 in central
found io.netty#netty-all;4.0.33.Final in central
found org.joda#joda-convert;1.2 in central
found com.twitter#jsr166e;1.1.0 in central
found org.scala-lang#scala-reflect;2.11.8 in central
found net.razorvine#pyrolite;4.10 in central
found net.razorvine#serpent;1.12 in central
:: resolution report :: resolve 814ms :: artifacts dl 12ms
:: modules in use:
anguenot#pyspark-cassandra;0.6.0 from spark-packages in [default]
com.datastax.spark#spark-cassandra-connector_2.11;2.0.5 from central in [default]
com.twitter#jsr166e;1.1.0 from central in [default]
commons-beanutils#commons-beanutils;1.9.3 from central in [default]
commons-collections#commons-collections;3.2.2 from central in [default]
io.netty#netty-all;4.0.33.Final from central in [default]
joda-time#joda-time;2.3 from central in [default]
net.razorvine#pyrolite;4.10 from central in [default]
net.razorvine#serpent;1.12 from central in [default]
org.joda#joda-convert;1.2 from central in [default]
org.scala-lang#scala-reflect;2.11.8 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 11 | 0 | 0 | 0 || 11 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 11 already retrieved (0kB/16ms)
17/11/08 19:51:43 WARN ClosureCleaner: Expected a closure; got pyspark_util.BatchUnpickler
Traceback (most recent call last):
File "/opt/mapr/spark/spark-2.1.0/bin/./sparkcassandra.py", line 18, in <module>
rdd.saveToCassandra("myspace","mytable")
File "/home/mapr/.ivy2/jars/anguenot_pyspark-cassandra-0.6.0.jar/pyspark_cassandra/rdd.py", line 93, in saveToCassandra
File "/opt/mapr/spark/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/opt/mapr/spark/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o36.saveToCassandra.
: java.io.IOException: Couldn't find myspace.mytable or any similarly named keyspace and table pairs
at com.datastax.spark.connector.cql.Schema$.tableFromCassandra(Schema.scala:356)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:344)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)
at pyspark_cassandra.PythonHelper.saveToCassandra(PythonHelper.scala:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Yes, I manually created keyspace 'myspace' and table 'mytable' with CQL in lower cases. And also inserted one record with CQL.
--Xiaoming YangCell Phone:408-800-9880Email: yxm...@gmail.com
Yes, the machine is my laptop which has Cassandra single node cluster, the script is running in Linux VM, I can use cqlsh in the VM to connect the Cassandra in my laptop.
Skype: Michael05213Email: yxm...@gmail.com
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Does pyspark-cassandra-connector connect to Cassandra via native protocol or thrift? I found in cassandra.yaml they use different port, I also tried adding conf.set("spark.cassandra.connection.rpc.port","9160") but still no luck. My cassandra.yml changes is like below, any suggestions?...rpc_address: 0.0.0.0...rpc_interface_prefer_ipv6: false...broadcast_rpc_address: 172.21.4.2
On Thu, Nov 9, 2017 at 9:02 AM Russell Spitzer <rus...@datastax.com> wrote:
Just let you know, if I run my script also in my laptop, which is the same address with Cassandra, the rdd data can be saved.On Thu, Nov 9, 2017 at 10:52 AM Michael Yang <yxm...@gmail.com> wrote:FYI.[mapr@maprdemo bin]$ ./cqlsh 172.21.4.2Connected to Test Cluster at 172.21.4.2:9042.[cqlsh 5.0.1 | Cassandra 3.11.1 | CQL spec 3.4.4 | Native protocol v4]Use HELP for help.cqlsh> describe keyspace myspaceCREATE KEYSPACE myspace WITH replication = {'class': 'SimpleStrategy', 'replicat ion_factor': '1'} AND durable_writes = true;CREATE TABLE myspace.mytable (key text PRIMARY KEY,options map<text, text>,stamp timestamp,tags list<text>,val double) WITH bloom_filter_fp_chance = 0.01AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}AND comment = ''AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCom pactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandr a.io.compress.LZ4Compressor'}AND crc_check_chance = 1.0AND dclocal_read_repair_chance = 0.1AND default_time_to_live = 0AND gc_grace_seconds = 864000AND max_index_interval = 2048AND memtable_flush_period_in_ms = 0AND min_index_interval = 128AND read_repair_chance = 0.0AND speculative_retry = '99PERCENTILE';cqlsh>
Thanks anyway. Do you think the spark version matters? My laptop is using Spark 2.2, but the Linux VM is using Spark 2.1, since the pyspark-cassandra-connector said support Spark 2.0+, it shouldn't be a matter, right?