// Spark Configuration Options
SparkConf sparkConfig = new SparkConf();
sparkConfig.set("spark.master", "SPARK_MASTER_HOSTNAME_HERE");
sparkConfig.set("spark.driver.maxResultSize", "4g");
sparkConfig.set("spark.driver.memory", "4g");
sparkConfig.set("spark.executor.memory", "4g");
String thisJar = NetworkAnalysisComputer.class.getProtectionDomain().getCodeSource().getLocation().toString();
sparkConfig.set("spark.jars", thisJar);
sparkConfig.set("spark.driver.userClassPathFirst", "false");
sparkConfig.set("spark.executor.userClassPathFirst", "false");
sparkConfig.set("spark.cores.max", "16");
// Hadoop I/O Input Options
Configuration hadoopConfig = new PropertiesConfiguration();
hadoopConfig.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "CASSANDRA_HOSTNAME_HERE");
hadoopConfig.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cassandrathrift");
hadoopConfig.setProperty("cassandra.input.partitioner.class", "org.apache.cassandra.dht.Murmur3Partitioner");
hadoopConfig.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph");
hadoopConfig.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat");
hadoopConfig.setProperty("gremlin.hadoop.graphWriter", "org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat");
SparkSession spark =
SparkSession.builder()
.appName("NetworkAnalysisComputer")
.config(sparkConfig)
.getOrCreate();
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
Encoder<VertexDescriptor> vertexDescriptorEncoder = Encoders.bean(VertexDescriptor.class);
Encoder<PathDetails> pathDetailsEncoder = Encoders.bean(PathDetails.class);
JavaRDD<VertexDescriptor> rdd = new InputFormatRDD().readGraphRDD(hadoopConfig, context).map(v -> new VertexDescriptor(v._2().get()));
Dataset<VertexDescriptor> typedDataSet = spark.createDataset(JavaRDD.toRDD(rdd), vertexDescriptorEncoder);String id;String label;List<EdgeDescriptor> inEdges;List<EdgeDescriptor> outEdges;Map<String, String> properties;
private val conf: Configuration = new BaseConfiguration()
conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.hbase.HBaseInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat");
conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "hbase")
conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "snowwhite.fairytales")
conf.setProperty("janusgraphmr.ioformat.conf.storage.hbase.table", "janusgraph")
conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
private val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
private val hadoopConfiguration = ConfUtil.makeHadoopConfiguration(conf)
private val rdd: RDD[(NullWritable, VertexWritable)] =
sparkSession.sparkContext.
newAPIHadoopRDD(
hadoopConfiguration,
hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, classOf[InputFormat[NullWritable, VertexWritable]]).
asInstanceOf[Class[InputFormat[NullWritable, VertexWritable]]],
classOf[NullWritable], classOf[VertexWritable])
rdd.collect().foreach(println(_))