Load large files into Titan 1.0.0

已查看 601 次
跳至第一个未读帖子

Laxmikant Patil

未读,
2016年2月4日 19:27:452016/2/4
收件人 Aurelius
Hi Daniel/Jason/Marko/Stephen, 

I have large CSV files in the following format.


recordID, person_name, Address, phone
------------------------------------------------------------------

(1,John, WA, 456781911)
(2, Anna, WA, 626762657)
(3, Peter,MA, 872783873)
.
.
.

I want to create a large network graph (There has to be one one node with same name /address/phone).   I have to load this big file into Titan 1.0.0 ( I have Cassandra as backend) . 

I just want to know whether this data can be directly loaded into Titan BulkLoaderVertexProgram directly? Or the data has to be transformed & create edge list separately?

Thanks.

Daniel Kuppitz

未读,
2016年2月4日 20:37:362016/2/4
收件人 aureliu...@googlegroups.com
How did you plan to structure your graph? What kind of vertices? What kind of edges? Which properties on which vertices/edges?

Cheers,
Daniel


--
You received this message because you are subscribed to the Google Groups "Aurelius" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aureliusgraph...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/aureliusgraphs/7860f74f-a798-4fa3-8df1-f5691d4ef3ca%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Laxmikant Patil

未读,
2016年2月4日 22:21:352016/2/4
收件人 Aurelius


Here structure of graph will be as folllows:

Each attribute will be one vertex. 

Each name will be associated with one or more record ID as different people can have same name. So each recordID will be linked to name by "has name" property. 

Similar structure will be for Phone & Address. i.e Phone, Name & Address will have link to RecordID.  There will not be any link in between phone , name & address.

I have millions of records

e.g

(1,John, WA, 456781911)  
(2, Anna, WA, 626762657)
(3, Peter,MA, 872783873)
(4, John, WA, 99920280)


Here in above example, there will be only one vertex named John, & WA and  they both will be linked to Vertex RecordID 1 & RecordID 4.   

456781911
|
|
1-----------John-------------4-------999920280
 |                                | 
 |                                |
WA-----------------------------


How do I go about building this kind of structure with only this CSV file which has millions of records? Do I have to keep track of all the record IDs in memory? 
Do these all operation have to happen in one transaction only? Can you give a hint about writing a code in Java?

Thanks.                         

Daniel Kuppitz

未读,
2016年2月5日 09:55:592016/2/5
收件人 aureliu...@googlegroups.com
I took your sample ...

daniel@cube /tmp $ cat /tmp/lax.txt
1,John,WA,456781911
2,Anna,WA,626762657
3,Peter,MA,872783873
4,John,WA,99920280

... ran a Spark job to bring that file into a better format ...

val textFile = sc.textFile("/tmp/lax.txt")
val maps = textFile.map(line => line.split(",")).
                    map(x => Map("recordId" -> x(0), "name" -> x(1), "address" -> x(2), "phone" -> x(3))).cache()

val records = maps.map(m => Array("recordId", m("recordId"), m("name"), m("phone"), m("address")).mkString("\t"))
val names = maps.map(m => (m("name"), Array(m("recordId")))).keyBy(_._1).mapValues(x => x._2).
                 reduceByKey((x, y) => x++y).map(x => Array("name", x._1, x._2.mkString(",")).mkString("\t"))

val addresses = maps.map(m => (m("address"), Array(m("recordId")))).keyBy(_._1).mapValues(x => x._2).
                     reduceByKey((x, y) => x++y).map(x => Array("address", x._1, x._2.mkString(",")).mkString("\t"))

val phones = maps.map(m => (m("phone"), Array(m("recordId")))).keyBy(_._1).mapValues(x => x._2).
                  reduceByKey((x, y) => x++y).map(x => Array("phone", x._1, x._2.mkString(",")).mkString("\t"))


records.union(names).union(addresses).union(phones).saveAsTextFile("/tmp/patil")

... and ended up with a file that could be used as a bulk loader input:

daniel@cube /tmp $ cat /tmp/patil/part-0000*
recordId    1    John    456781911    WA
recordId    2    Anna    626762657    WA
recordId    3    Peter   872783873    MA
recordId    4    John    99920280     WA
name     Anna     2
name     Peter    3
name     John     1,4
address  MA       3
address  WA       1,2,4
phone    456781911   1
phone    99920280    4
phone    626762657   2
phone    872783873   3

The key is to have 1 unique vertex per line together with all its edges (incoming and outgoing).

Now all you need is a proper Groovy script to parse the input file (which IMO should be pretty easy with the given structure).

Cheers,
Daniel



Jason Plurad

未读,
2016年2月5日 13:47:492016/2/5
收件人 Aurelius
Hi Laxmikant,

If you're looking to use Java code, check out Alex's and Matthew's Marvel graph example:

https://github.com/awslabs/dynamodb-titan-storage-backend/blob/1.0.0/src/main/java/com/amazon/titan/example/MarvelGraphFactory.java

It creates a Titan schema, parses a CSV, and then uses basic Gremlin addVertex() and addEdge() to build the graph. You'll notice that the TitanGraph isn't instantiated in the factory itself, so even though it is inside a Titan-DynamoDB example, you can use this with any Titan backend (Cassandra, HBase, Berkeley).

If your graph data is in the low millions, you could use a Titan-BerkeleyJE graph on your own machine, which might be an easier backend to use at first rather than a Cassandra cluster. I'd recommend that you do not get too caught up on loading a lot of data initially -- get comfortable with how to use Titan and TinkerPop with OLTP first and then move into OLAP approaches.

Enjoy!


-- Jason

Laxmikant Patil

未读,
2016年2月5日 18:42:072016/2/5
收件人 Aurelius
Thanks Daniel. That is really helpful. I will surely try this.

Thanks again.

Stephen Mallette

未读,
2016年2月8日 06:57:542016/2/8
收件人 Aurelius
nice ducati jason :D

Laxmikant Patil

未读,
2016年2月16日 04:04:592016/2/16
收件人 Aurelius
Hi Daniel,

Are there any dependency issues with SparkGraphComputer?

Check out this issue SparkGraphComputerLoading
SparkGraphComputer.workers(1) method does not exists in Tinkterpop 3.0.1, but it throws error when I run SparkGraphComputer code on Titan 1.0.0 page. 

 The given graph instance does not allow concurrent access.

How to resolve this conflict then?


On Friday, February 5, 2016 at 7:55:59 AM UTC-7, Daniel Kuppitz wrote:

Daniel Kuppitz

未读,
2016年2月16日 07:45:562016/2/16
收件人 aureliu...@googlegroups.com
Titan does allow concurrent access. Looks like your trying to load your data into TinkerGraph, which will only work in TinkerPop 3.1+.

Cheers,
Daniel


Laxmikant Patil

未读,
2016年2月16日 12:08:352016/2/16
收件人 Aurelius
Then how to resolve the issue of concurrent error with TitanGraph? I am bulkloading data into TitanGraph from CSV. But it is giving me this error. 

Daniel Kuppitz

未读,
2016年2月16日 13:02:592016/2/16
收件人 aureliu...@googlegroups.com
Can you show the whole code and the properties files you're using?

Cheers,
Daniel


Laxmikant Patil

未读,
2016年2月16日 13:28:522016/2/16
收件人 Aurelius
I am trying to execute the sample code on Titan 1.0.0  first so that I will get an idea how it works but it does not seem to work in my case, (I am using default package Titan 1.0.0, & have not modified any tinkerpop versions)

# hadoop-load.properties

#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.inputLocation=./data/grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true

#
# GiraphGraphComputer Configuration
#
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000

#
# SparkGraphComputer Configuration
#
spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer

// titan-schema-grateful-dead.groovy

def defineGratefulDeadSchema(titanGraph) {
    m = titanGraph.openManagement()
    // vertex labels
    artist = m.makeVertexLabel("artist").make()
    song   = m.makeVertexLabel("song").make()
    // edge labels
    sungBy     = m.makeEdgeLabel("sungBy").make()
    writtenBy  = m.makeEdgeLabel("writtenBy").make()
    followedBy = m.makeEdgeLabel("followedBy").make()
    // vertex and edge properties
    blid         = m.makePropertyKey("bulkLoader.vertex.id").dataType(Long.class).make()
    name         = m.makePropertyKey("name").dataType(String.class).make()
    songType     = m.makePropertyKey("songType").dataType(String.class).make()
    performances = m.makePropertyKey("performances").dataType(Integer.class).make()
    weight       = m.makePropertyKey("weight").dataType(Integer.class).make()
    // global indices
    m.buildIndex("byBulkLoaderVertexId", Vertex.class).addKey(blid).buildCompositeIndex()
    m.buildIndex("artistsByName", Vertex.class).addKey(name).indexOnly(artist).buildCompositeIndex()
    m.buildIndex("songsByName", Vertex.class).addKey(name).indexOnly(song).buildCompositeIndex()
    // vertex centric indices
    m.buildEdgeIndex(followedBy, "followedByWeight", Direction.BOTH, Order.decr, weight)
    m.commit()
}

gremlin> :load data/grateful-dead-titan-schema.groovy
==>true
==>true
gremlin> graph = TitanFactory.open('conf/titan-cassandra.properties')
==>standardtitangraph[cassandrathrift:[127.0.0.1]]
gremlin> defineGratefulDeadSchema(graph)
==>null
gremlin> graph.close()
==>null
gremlin> hdfs.copyFromLocal('data/grateful-dead.kryo','data/grateful-dead.kryo')
==>null
gremlin> graph = GraphFactory.open('conf/hadoop-graph/hadoop-load.properties')
==>hadoopgraph[gryoinputformat->nulloutputformat]
gremlin> blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-cassandra.properties').create(graph)
==>BulkLoaderVertexProgram[bulkLoader=IncrementalBulkLoader,vertexIdProperty=bulkLoader.vertex.id,userSuppliedIds=false,keepOriginalIds=true,batchSize=0]
gremlin> graph.compute(SparkGraphComputer).program(blvp).submit().get()

On the last statement I get the error,

11:21:05 WARN  org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer  - class org.apache.hadoop.mapreduce.lib.output.NullOutputFormat does not implement PersistResultGraphAware and thus, persistence options are unknown -- assuming all options are possible
11:21:20 WARN  org.apache.hadoop.io.compress.snappy.LoadSnappy  - Snappy native library not loaded
11:21:21 ERROR org.apache.spark.executor.Executor  - Exception in task 0.0 in stage 0.0 (TID 0)
java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.seekToHeader(GryoRecordReader.java:82)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.initialize(GryoRecordReader.java:74)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat.createRecordReader(GryoInputFormat.java:39)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
11:21:21 WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.seekToHeader(GryoRecordReader.java:82)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.initialize(GryoRecordReader.java:74)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat.createRecordReader(GryoInputFormat.java:39)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

11:21:21 ERROR org.apache.spark.scheduler.TaskSetManager  - Task 0 in stage 0.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.seekToHeader(GryoRecordReader.java:82)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.initialize(GryoRecordReader.java:74)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat.createRecordReader(GryoInputFormat.java:39)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
Display stack trace? [yN] y
java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.seekToHeader(GryoRecordReader.java:82)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.initialize(GryoRecordReader.java:74)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat.createRecordReader(GryoInputFormat.java:39)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at java_util_concurrent_Future$get.call(Unknown Source)
at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:45)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:110)
at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:114)
at groovysh_evaluate.run(groovysh_evaluate:3)
at org.codehaus.groovy.vmplugin.v7.IndyInterface.selectMethod(IndyInterface.java:215)
at org.codehaus.groovy.tools.shell.Interpreter.evaluate(Interpreter.groovy:69)
at org.codehaus.groovy.tools.shell.Groovysh.execute(Groovysh.groovy:185)
at org.codehaus.groovy.tools.shell.Shell.leftShift(Shell.groovy:119)
at org.codehaus.groovy.tools.shell.InteractiveShellRunner.super$2$work(InteractiveShellRunner.groovy)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90)
at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:324)
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1207)
at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuperN(ScriptBytecodeAdapter.java:130)
at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuper0(ScriptBytecodeAdapter.java:150)
at org.codehaus.groovy.tools.shell.ShellRunner.run(ShellRunner.groovy:58)
at org.codehaus.groovy.tools.shell.InteractiveShellRunner.super$2$run(InteractiveShellRunner.groovy)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:90)
at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:324)
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1207)
at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuperN(ScriptBytecodeAdapter.java:130)
at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.invokeMethodOnSuper0(ScriptBytecodeAdapter.java:150)
at org.codehaus.groovy.tools.shell.InteractiveShellRunner.run(InteractiveShellRunner.groovy:82)
at org.codehaus.groovy.vmplugin.v7.IndyInterface.selectMethod(IndyInterface.java:215)
at org.apache.tinkerpop.gremlin.console.Console.<init>(Console.groovy:144)
at org.codehaus.groovy.vmplugin.v7.IndyInterface.selectMethod(IndyInterface.java:215)
at org.apache.tinkerpop.gremlin.console.Console.main(Console.groovy:303)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.seekToHeader(GryoRecordReader.java:82)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.initialize(GryoRecordReader.java:74)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat.createRecordReader(GryoInputFormat.java:39)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)





On Tuesday, February 16, 2016 at 11:02:59 AM UTC-7, Daniel Kuppitz wrote:
Can you show the whole code and the properties files you're using?am 

Cheers,
Daniel

Marko Rodriguez

未读,
2016年2月16日 14:00:342016/2/16
收件人 aureliu...@googlegroups.com
Hi,

I suspect your inputLocation is bad.

gremlin.hadoop.inputLocation=./data/grateful-dead.kryo

Its probably reading an empty location and then trying to "seekToHeader" and failing. Can you verify that the input file is actually being read? Perhaps do a PageRankVertexProgram (and thus, get Titan out of the mix).

Marko.

Laxmikant Patil

未读,
2016年2月16日 14:36:572016/2/16
收件人 Aurelius
Hi Marko,

Thanks for your suggestion.
Can you specify how to check if the file exists in HDFS, 

I cannot do "hadoop fs -ls /" since the hadoop is internally started by gremlin query, So exactly how do I check through gremlin query?

On Tuesday, February 16, 2016 at 12:00:34 PM UTC-7, Marko Rodriguez wrote:
Hi,
iur inputLocation is bad.
...

Marko Rodriguez

未读,
2016年2月16日 14:59:032016/2/16
收件人 aureliu...@googlegroups.com
Hello,

Perhaps do a PageRankVertexProgram (and thus, get Titan out of the mix).

Meaning, don't try and do a BulkLoad. If that fails, then we know its something with how the file is being read. Then I would probably try data/grateful-dead.kryo instead of ./data/grateful-dead.kryo. In short, twiddle and figure out why your reference to the file is not showing up. You can also try: "gremlin> hdfs.ls()" .. perhaps you are not pointing to the cluster, but to your local machine.

Marko.
--
You received this message because you are subscribed to the Google Groups "Aurelius" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aureliusgraph...@googlegroups.com.

Laxmikant Patil

未读,
2016年2月16日 16:10:472016/2/16
收件人 Aurelius
The Pagerankvertexprogram works fine. Even this code Tickerpop-modern.kyro works fine when I copy Tickerpop-modern.kyro into HDFS.

So the issue is with  grateful-dead.kyro file provided with Titan package, when I  copy "grateful-dead.kyro" into hdfs the size of the file becomes suddenly zero. And Maybe because that it gives error as EOFileexception.

Is it possible for you to try the same code grateful-dead.kyro on your machine whether it works correctly, if not it would be helpful for correcting the grateful-dead.kyro file in package and docs.

Main purpose of this is to learn how I can use SparkGraphComputer to create a graph & also do some later processing on stored graph in Titan using Spark. And I think this is the only example of SparkGraphComputer available on Titan 1.0.0.  

Many thanks Marko for replying to my queries. Thanks a lot! :)

Daniel Kuppitz

未读,
2016年2月16日 18:05:252016/2/16
收件人 aureliu...@googlegroups.com
Try to change the input location:

gremlin.hadoop.inputLocation=grateful-dead.kryo

Then copy the file like this into HDFS

hdfs.copyFromLocal('data/grateful-dead.kryo','grateful-dead.kryo')

and try again.

Cheers,
Daniel


Laxmikant Patil

未读,
2016年2月16日 20:19:042016/2/16
收件人 Aurelius
Hi Daniel,

Thanks for reply.

I tried again with changing input location as u suggested.

Still i get error:

18:16:21 ERROR org.apache.spark.executor.Executor  - Exception in task 0.0 in stage 0.0 (TID 0)
java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.seekToHeader(GryoRecordReader.java:82)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.initialize(GryoRecordReader.java:74)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat.createRecordReader(GryoInputFormat.java:39)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
18:16:21 WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.seekToHeader(GryoRecordReader.java:82)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.initialize(GryoRecordReader.java:74)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat.createRecordReader(GryoInputFormat.java:39)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

18:16:21 ERROR org.apache.spark.scheduler.TaskSetManager  - Task 0 in stage 0.0 failed 1 times; aborting job
18:16:22 WARN  org.eclipse.jetty.util.thread.QueuedThreadPool  - 1 threads could not be stopped
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.seekToHeader(GryoRecordReader.java:82)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordReader.initialize(GryoRecordReader.java:74)
at org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat.createRecordReader(GryoInputFormat.java:39)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Do all the steps work at your machine?

Thanks.

Marko Rodriguez

未读,
2016年2月16日 21:49:152016/2/16
收件人 aureliu...@googlegroups.com
Hi,

I suspect you have a corrupt file. Perhaps try another .kryo file.

Marko.

Jason Plurad

未读,
2016年2月18日 10:57:292016/2/18
收件人 Aurelius
Hi Laxmikant,

I have confirmed that the BLVP example works with titan-1.0.0-hadoop1, hadoop-1.2.1, cassandra-2.1.9, spark-1.2.1-hadoop1. Running on Ubuntu 14.04 with OpenJDK 1.8.0_72-internal.

What you mentioned is a bit troubling: 'when I copy "grateful-dead.kyro" into hdfs the size of the file becomes suddenly zero.' I don't have any idea why that's happening.

Here's what I see on file upload to HDFS:

$ export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
$
export HADOOP_PREFIX=/usr/lib/hadoop-1.2.1
$
export CLASSPATH=$HADOOP_PREFIX/conf
$
./bin/gremlin.sh
gremlin
> Titan.version()
==>1.0.0
gremlin
> Gremlin.version()
==>3.0.1-incubating
gremlin> hdfs.ls()
==>rwxr-xr-x graphie hadoop 0 (D) _bsp
==>rwxr-xr-x graphie hadoop 0 (D) data
==>rwxr-xr-x graphie hadoop 0 (D) hadoop-gremlin-libs
gremlin
> hdfs.copyFromLocal('data/grateful-dead.kryo', 'data/grateful-dead.kryo')
==>null
gremlin
> hdfs.ls('data/grateful-dead.kryo')
==>rw-r--r-- graphie hadoop 332226 grateful-dead.kryo

Here's the writeGraph (titan-cassandra.properties) I used:

gremlin.graph=com.thinkaurelius.titan.core.TitanFactory
storage
.backend=cassandrathrift
storage
.hostname=u1401.ambari.apache.org,u1402.ambari.apache.org,u1403.ambari.apache.org
storage
.cassandra.keyspace=titan
storage
.cassandra.replication-factor=3

And the readGraph (hadoop-load.properties). It worked with local[*] and with a standalone Spark cluster.

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin
.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin
.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin
.hadoop.inputLocation=./data/grateful-dead.kryo
gremlin
.hadoop.outputLocation=output
gremlin
.hadoop.jarsInDistributedCache=true
spark
.master=local[*]
#spark.master=spark://u1401:7077
spark
.executor.memory=1g
spark
.serializer=org.apache.spark.serializer.KryoSerializer

-- Jason
...
回复全部
回复作者
转发
0 个新帖子