problem in bulk edge creation

178 views
Skip to first unread message

Ankur Goel

unread,
Nov 14, 2017, 9:21:45 AM11/14/17
to JanusGraph users
I have graph with million vertex.

using below Java code to create edge in bulk which is getting executed in loop for each ID:

Vertex v1 = g.V().has(VLABEL,VERTEX_ORDER).has(PROPERTY_ORDERID, order.getId()).next();           
List<Vertex> orderList=g.V().has(PROPERTY_VLABEL,Text.textContains(VERTEX_ORDER)).has(PROPERTY_MBLOGINID,Text.textContainsRegex(domain)).has(PROPERTY_LEADID,P.gt(order.getCId())).toList();
orderList.forEach(matchOrder -> {
                if(!graphEdgesService.edgeExist(graph, v1, v2,REASON_LOGIN_DOMAIN))
                                v1.addEdge(LABEL_EDGE_DIRECT, v2);
});


Intermittently i am getting below exception:

java.lang.IllegalStateException: Vertex with id 154898624 was removed.
        at org.apache.tinkerpop.gremlin.structure.Element$Exceptions.elementAlreadyRemoved(Element.java:154)
        at org.janusgraph.core.InvalidElementException.removedException(InvalidElementException.java:58)
        at org.janusgraph.graphdb.vertices.AbstractVertex.it(AbstractVertex.java:54)
        at org.janusgraph.graphdb.vertices.AbstractVertex.it(AbstractVertex.java:37)
        at org.janusgraph.graphdb.internal.AbstractElement.isRemoved(AbstractElement.java:141)
       at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.verifyAccess(StandardJanusGraphTx.java:294)
        at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.verifyWriteAccess(StandardJanusGraphTx.java:285)
        at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.addEdge(StandardJanusGraphTx.java:679)
        at org.janusgraph.graphdb.vertices.AbstractVertex.addEdge(AbstractVertex.java:163)
        at org.janusgraph.graphdb.vertices.AbstractVertex.addEdge(AbstractVertex.java:37)
        at com.test.ankur.janusgraph.serviceImpl.GraphEdgesServiceImpl.addEdge(GraphEdgesServiceImpl.java:317)
        at sun.reflect.GeneratedMethodAccessor181.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
        at com.sun.proxy.$Proxy132.addEdge(Unknown Source)
        at com.test.ankur.janusgraph.serviceImpl.OrderEdgesServiceImpl.lambda$null$8(OrderEdgesServiceImpl.java:171)
        at java.util.ArrayList.forEach(ArrayList.java:1249)
        at com.test.ankur.janusgraph.serviceImpl.OrderEdgesServiceImpl.lambda$addLoginDomainEdges$9(OrderEdgesServiceImpl.java:169)
        at java.util.ArrayList.forEach(ArrayList.java:1249)
        at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1080)
        at com.test.ankur.janusgraph.serviceImpl.OrderEdgesServiceImpl.addLoginDomainEdges(OrderEdgesServiceImpl.java:159)
        at sun.reflect.GeneratedMethodAccessor175.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99)
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282)
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
        at com.sun.proxy.$Proxy142.addLoginDomainEdges(Unknown Source)
        at com.magicbricks.sbatch.writer.UserLoginEdgeCreationWriter.write(UserLoginEdgeCreationWriter.java:37)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:175)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:151)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:274)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:199)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
        at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:271)
        at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81)
        at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374)
        at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
        at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)
        at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
        at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139)
        at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:745)

Please suggest what is going wrong or missing?

~


tpr...@gmail.com

unread,
Nov 14, 2017, 2:50:52 PM11/14/17
to JanusGraph users
I will did it with a unique query with choose step. 

g.V().has(VLABEL,VERTEX_ORDER).has(PROPERTY_ORDERID, order.getId()).as("v1").V().has(PROPERTY_VLABEL,Text.textContains(VERTEX_ORDER)).has(PROPERTY_MBLOGINID,Text.textContainsRegex(domain)).has(PROPERTY_LEADID,P.gt(order.getCId())).choose(graphEdgesService.edgeExist(graph, v1, v2,REASON_LOGIN_DOMAIN), addEdge(LABEL_EDGE_DIRECT).from("v1)).iterate()

I do not know the gremlin code inside graphEdgesService.edgeExist(graph, v1, v2,REASON_LOGIN_DOMAIN) method but I'm sure you can write a gremlin condition for the choose step.

Ankur Goel

unread,
Nov 15, 2017, 1:36:15 AM11/15/17
to JanusGraph users
graphEdgesService.edgeExist is to check if an edge exist between v1 and v2, to avoid duplicate edge.

in crux: if no edge exist then only create a edge between two vertex.

~

Robert Dale

unread,
Nov 15, 2017, 6:58:40 AM11/15/17
to Ankur Goel, JanusGraph users
First, does the 'graphEdgesService' perform deletes or are there any processes concurrently modifying your graph especially deletes?  This is what the exception seems to imply.
Is this code done in its own transaction?

As for creating edges, I would do this as one traversal if there's no additional logic in 'graphEdgesService.edgeExists' :

g.V().has(VLABEL, VERTEX_ORDER).
    has(PROPERTY_ORDERID, order.getId()).
    as('a').
    V().has(PROPERTY_VLABEL, Text.textContains(VERTEX_ORDER)).
    has(PROPERTY_MBLOGINID, Text.textContainsRegex(domain)).
    has(PROPERTY_LEADID, P.gt(order.getCId())).
    coalesce(__.in(REASON_LOGIN_DOMAIN).where(eq('a')),
        __.addE(LABEL_EDGE_DIRECT).from('a'))


Robert Dale

--
You received this message because you are subscribed to the Google Groups "JanusGraph users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janusgraph-users+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/janusgraph-users/a74aaa03-65f7-473b-b463-9be1a37a3e7a%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Ankur Goel

unread,
Nov 15, 2017, 9:24:48 AM11/15/17
to JanusGraph users
Yes, code is in single transaction. graphEdgesService is only checking if any edge exists or not.

This is the only process that is running no parallel execution is happening.

~

Robert Dale

To unsubscribe from this group and stop receiving emails from it, send an email to janusgraph-use...@googlegroups.com.

Ankur Goel

unread,
Nov 27, 2017, 9:22:10 AM11/27/17
to JanusGraph users
very random behaviour, occurring intermittently.

Ankur Goel

unread,
Nov 28, 2017, 12:22:34 AM11/28/17
to JanusGraph users
what i have figured out in each method i am calling g = graph.traversal() for each query execution.

This looks culprit.

Please suggest.
~

Robert Dale

unread,
Nov 28, 2017, 7:03:05 AM11/28/17
to Ankur Goel, JanusGraph users
Please create a reproducible test case and create an issue on github - https://github.com/JanusGraph/janusgraph/issues

Robert Dale

To unsubscribe from this group and stop receiving emails from it, send an email to janusgraph-users+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/janusgraph-users/a78c95b4-540d-43b6-81c9-0c34610573f9%40googlegroups.com.

Ankur Goel

unread,
Nov 29, 2017, 12:37:16 AM11/29/17
to JanusGraph users
Robert,

Its very intermittent, unable to give exact steps to reproduce.

High level steps:
- first i create all vertex.
- Then based on certain property i am creating edges between vertex and making sure no duplicate edge gets created using spring batch partition.

i have 5 node cassandra + single solr node.

Does JanusJavaClient keep some cache data like gremlin-console. That might be an problem where two spring batch thread is seeing different data for a moment as all threads are using their own instance of graph.traversal().


~AnkurG


Robert Dale

Reply all
Reply to author
Forward
0 new messages