I did not do anything special - but here are the basics. Note that if you
are using VPN, especially on Macs, it may be creating a tunnel and
disabling traffic to your main ethernet port. You may have to hardcode the
local IP address in the spark-env.sh as follows:
SPARK_LOCAL_IP=127.0.0.1
SPARK_MASTER_IP=127.0.0.1
Other than this, nothing special is needed.
L-SNVL12EFD5-M:spark-1.4.0-bin-hadoop2.6 vkandas$ ./bin/spark-shell --jars
/Users/vkandas/Documents/Workspace/spark-cassandra-connector/spark-cassand
ra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly
-1.4.0-M1-SNAPSHOT.jar --conf
spark.cassandra.connection.host=
dev-cas00.sv.walmartlabs.com
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/07/05 06:39:30 INFO SecurityManager: Changing view acls to: vkandas
15/07/05 06:39:30 INFO SecurityManager: Changing modify acls to: vkandas
15/07/05 06:39:30 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(vkandas);
users with modify permissions: Set(vkandas)
15/07/05 06:39:30 INFO HttpServer: Starting HTTP Server
15/07/05 06:39:30 INFO Utils: Successfully started service 'HTTP class
server' on port 63927.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
15/07/05 06:39:31 INFO SparkContext: Running Spark version 1.4.0
Š
scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._
scala> val stg = sc.cassandraTable("qarth_catalog_dev", "product_v1")
stg:
com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.c
onnector.CassandraRow] = CassandraTableScanRDD[0] at RDD at
CassandraRDD.scala:15
scala> stg.first()
15/07/05 06:56:22 INFO DAGScheduler: Job 0 finished: take at
CassandraRDD.scala:118, took 1.863234 s
res0: com.datastax.spark.connector.CassandraRow = CassandraRow{wpid:
66MKDP7AWAFV, tenant_id: 0, update_date_time:
e2ade340-d008-11e4-a44f-11a51653aa7f, product_source: WALMART_DOTCOM,
product_status: PRODUCT_PROCESSED, source_id: 14018328, value:
{"qid":"WALMART_DOTCOM#14018328","type":"PRODUCT","source":"WALMART_DOTCOM"
,"classification":{"PRODUCT_TYPE":{"product_category":{"properties":{"attri
buteName":"Product
Category","taxonomy_version":"urn:taxonomy:pcs2.0","status":"VERIFIED"},"va
lues":[{"id":"45066","locale":"en_US","value":"Home &
Garden"}]},"product_type":{"properties":{"attributeName":"Product
Type","taxonomy_version":"urn:taxonomy:pcs2.0","status":"VERIFIED"},"values
":[{"id":"635","locale":"en_US","value":"Ottomans"}]}}},"status":"PRODUCT_P
ROCESSED","request_id":"e1fd7350-d3...
scala> 15/07/05 06:56:22 INFO CassandraConnector: Disconnected from
Cassandra cluster: Dev Cluster
‹‹‹‹‹‹‹‹‹‹
Or, you can use data frames like:
scala> val df = sqlContext.load("org.apache.spark.sql.cassandra", options
= Map( "table" -> "product_v1", "keyspace" -> "qarth_catalog_dev" ))
And then do df.first(), df.count() etc.
Data frames appear to be really slow compared to the direct cassandraTable
method above.