GraphActors as a new distributed computing framework in TinkerPop

448 views
Skip to first unread message

Marko Rodriguez

unread,
Dec 14, 2016, 12:46:44 PM12/14/16
to gremli...@googlegroups.com, d...@tinkerpop.apache.org
Hello,

For the last week I’ve been working on “distributed OLTP.” Gremlin has a really nice architecture in that a traverser can be shipped around a cluster and reattached to its respective element (vertex/edge/etc.) and step (traversal) at the remote location and continue to compute. Thus, we can have step-by-step query routing.


With that, I’ve created GraphActors which is similar to GraphComputer. However, there are some fundamental distinctions:

1. GraphActors assumes the boundary of computation is a Partition.
- GraphComputer assumes the boundary of computation is vertex and its incident edges and properties.
2. GraphActors assumes asynchronous computation with barriers at Barrier steps.
- GraphComputer assumes (sorta) synchronous computation with a barrier when all traversers have left their local vertex.
3. GraphActors is traverser-centric and partition-bound.
- GraphComputer is vertex-centric and vertex-bound.

In gremlin-core/ I’ve created a new set of interfaces off of process/.

GraphActors <=> GraphComputer
MasterActor <=> setup()/terminate().
WorkerActor <=> execute()
ActorProgram <=> VertexProgram

The parallel between GraphComputer and GraphActors are strong. In short, a (hardcore) user can create an ActorProgram and submit it to a GraphActors. The ActorProgram will effect a distributed, asynchronous, partition-bound message passing algorithm and return a Future<Result>. There is one ActorProgram in particular the executes a Gremlin traversal.


Pretty simple. Besides some problems I’m having with serialization stuff in GroupStep, the ProcessSuite passes.

Now, its up to a provider to implement the GraphActors interfaces. Welcome akka-gremlin/.


Dead simple!

What we should have done with TinkerPop from the start is include the notion of a Partition. For this branch, I’ve added two concepts Partition and Partitioner.


Why is this cool? This is cool because if GraphActors system knows the partitions of the underlying Graph data, then it can immediately process the Graph data in a distributed manner. No need to write a custom “InputFormat.” We should have done this from the start because then GraphComputer could do the same. For instance, spark-gremlin/ can run over TinkerGraph as it doesn’t care about TinkerGraph, it cares about Partition “input splits.” By adding this layer of information, ANY Graph can work with ANY GraphComputer or GraphActors. I have yet to create PartitionInputRDD and PartitionInputFormat, but that will be next and at that point, GraphComputers are agnostic to the underlying implementation.

So there you have it. Your thoughts on the matter would be most appreciated.

Things still left to do:

* We need a concept of a traversal engine. That is, something like:
- g.withEngine(SparkGraphComputer.class) // it knows its a GraphComputer so thats the engine.
- g.withEngine(AkkaGraphActors.class) // it knows its a GraphActors so thats the engine.
- g.withEngine(Iterator.class) // this means, just iterate the traversal locally :).
* GraphComputer semantics are a restricted version of GraphActors semantics.
- GraphActors becomes GraphComputer when the Partitions are defined by vertices.
- I think I can unify the two and thus, we could have SparkGraphActors.



Here is some fun playing:


         \,,,/
         (o o)
-----oOOo-(3)-oOOo-----
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.tinkergraph
gremlin> :install org.apache.tinkerpop akka-gremlin 3.3.0-SNAPSHOT
==>Loaded: [org.apache.tinkerpop, akka-gremlin, 3.3.0-SNAPSHOT]
gremlin> graph = TinkerFactory.createModern()
==>tinkergraph[vertices:6 edges:6]
gremlin> graph.partitioner()
==>partitioner[globalpartitioner:1]
gremlin> partitioner = new HashPartitioner(graph.partitioner(),3)          // lets create 3 logical partitions over TinkerGraph
==>partitioner[hashpartitioner:3]
gremlin> g = graph.traversal().withStrategies(new ActorProgramStrategy(AkkaGraphActors,partitioner)) // in the future withEngine() will be used
==>graphtraversalsource[tinkergraph[vertices:6 edges:6], actors]
gremlin> g.V().repeat(out()).times(2).values('name')
==>lop
==>ripple
gremlin> g.V().repeat(both()).times(2).groupCount().by(out().in().count())  // beyond the star graph!
==>[0:13,3:3,4:7,5:7]
gremlin> g.V().match(                                   // distributed pattern matching
......1>  __.as('a').out('created').as('b'),
......2>  __.as('b').has('name', 'lop'),
......3>  __.as('b').in('created').as('c'),
......4>  __.as('c').has('age', 29)).
......5>   select('a','c').by('name')
==>[a:marko,c:marko]
==>[a:josh,c:marko]
==>[a:peter,c:marko]
gremlin>

Now imagine this executing over various providers:

1. A sharded graph database (Titan/DSEGraph/OrientDB): the traversers move between machines so they are always data local processing.
2. A replicated graph database (Neo4j): logical partitions are created so that each machine is responsible for a subgraph of the full graph (load balancing and parallization).
3. A single machine graph database (TinkerGraph): logical partitions are created so that each core of the machine is responsible for a subgraph (parallization).

Pretty neat, eh?

Ideas are more than welcome,
Marko.

HadoopMarc

unread,
Dec 14, 2016, 3:10:01 PM12/14/16
to Gremlin-users, d...@tinkerpop.apache.org
Hi Marko,

This is pretty neat indeed! It will enhance the productivity of gremlin query writers when queries can be copied from the fora without worries about the kind of backend used and whether it is an analytical (OLAP) or a transactional query (OLTP). I have only one idea: do traversal API users still really have to know whether they use a GraphComputer or GraphActors? In other words, can the withEngine options not just be some illuminating token constants for users that just want to have the traversal() returned (LOCAL, LOCAL_DISTRIBUTED, DISTRIBUTED)?  Of course, the more extended API will be useful for a minority of power users that want to optimize an ActorProgram for a specific use case.

Cheers,    Marc

Op woensdag 14 december 2016 18:46:44 UTC+1 schreef Marko A. Rodriguez:

Michael Pollmeier

unread,
Dec 15, 2016, 8:31:50 AM12/15/16
to Gremlin-users, d...@tinkerpop.apache.org
Super cool! Two thoughts:

How will this get deployed? Each database instance (alternatively gremlin-server) shipping a version of akka-actor and akka-cluster?

What does it mean for performance? Here's my understanding... thoughts?

1. A sharded graph database: as long as the data is local it'll scale linearly, then it needs some synchronisation (i.e. hand off the traversal to the instance where the data is local again). I.e. there'll be a sweet spot of replication vs. shards for each use case.
2. A replicated graph database: should scale linearly for most traversals
3. A single machine graph database: should scale linearly for most traversals

Michael

pieter

unread,
Dec 15, 2016, 10:33:35 AM12/15/16
to gremli...@googlegroups.com
Hi,

Afraid its not that simple for me.

Here is what I was thinking of when you first mentioned partitions and
multiple threads.

Eg. Say the graph has a 1 000 000 000 Persons. And each person has a
country field and Persons are sharded/paritioned on country.

Then g.V().hasLabel("Person").has("name", "marko") can travers all
Partitions with its own thread.

g.V(MarkoPersonVertex).out() will, if the vertex does not contain its
partition info scan all partitions in its own thread till it finds
Marko and then cancel then the others.

g.V().hasLabel("Person").has("country", "USA").. will scan one
partition.



Regardless of partitions,

g.V(markoVertex).out("knows").out("created") The second 'out' can for
every incoming 'knows' vertex get the 'created' vertices in a separate
thread.

Is all this possible with what you have done or have I missed the
boat?  


Thanks 
Pieter
> -- 
> You received this message because you are subscribed to the Google
> Groups "Gremlin-users" group.
> To unsubscribe from this group and stop receiving emails from it,
> send an email to gremlin-user...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/
> msgid/gremlin-users/7AA47D97-4919-474E-850A-AD8709E4FD87%40gmail.com.
> For more options, visit https://groups.google.com/d/optout.

Marko Rodriguez

unread,
Dec 15, 2016, 11:40:11 AM12/15/16
to gremli...@googlegroups.com, d...@tinkerpop.apache.org
Hello,

I have only one idea: do traversal API users still really have to know whether they use a GraphComputer or GraphActors? In other words, can the withEngine options not just be some illuminating token constants for users that just want to have the traversal() returned (LOCAL, LOCAL_DISTRIBUTED, DISTRIBUTED)?  Of course, the more extended API will be useful for a minority of power users that want to optimize an ActorProgram for a specific use case.

So, Matthias Bröcheler, for a few years now, wanted something like a TraversalEngineReasoningStrategy. This would be a DecorationStrategy that would look at the traversal and make a best guess as whether to execute iterator-style, compute-style, or actors-style. For instance:

g.V().count() // computer
g.V(1).out().count() // iterator
g.V(1).repeat(out()).times(3).count() // actors

I think for now withEngine() is the bare-bones necessity and we can get clever with reasoning and your enum-model down the line.

Finally, note that I went with withProcessor() last night as the name :). GraphActors and GraphComputer both implement a new Processor interface (which is primarily a marker interface).

Marko.


--
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-user...@googlegroups.com.

Marko Rodriguez

unread,
Dec 15, 2016, 11:47:20 AM12/15/16
to gremli...@googlegroups.com, d...@tinkerpop.apache.org
Hi,

How will this get deployed? Each database instance (alternatively gremlin-server) shipping a version of akka-actor and akka-cluster?

This is a good question. As I’m seeing it lately, I think we treat it just like spark-gremlin/. That is, lets assume a multi-machine graph database:

1. User has a graph database across 3 nodes in a cluster.
2. User has Akka Cluster setup on those 3 nodes. (like they would have SparkServer or Hadoop).
3. akka-gremlin/ “jobs” have a configuration with information about the Akka cluster and the graph database partitions.

Thus, I don’t think GremlinServer really needs to come into play. However, I sort of think that down the line, GremlinServer should support the spawning of “services.” For instance, it would be great if GremlinServer, when deployed, it could spawn a SparkServer cluster or an Akka Cluster… This removes the headache for users having to install and configure stuff. It would be great if GremlinServer was like a Docker or something.

bin/gremlin-server.sh —i akka.gremlin.plugin —c akka.properties

Dunno. Stephen would have more to say.


What does it mean for performance? Here's my understanding... thoughts?

1. A sharded graph database: as long as the data is local it'll scale linearly, then it needs some synchronisation (i.e. hand off the traversal to the instance where the data is local again). I.e. there'll be a sweet spot of replication vs. shards for each use case.
2. A replicated graph database: should scale linearly for most traversals
3. A single machine graph database: should scale linearly for most traversals

So there will be traverser migration when a traverser no longer references data in its current partition. That is a message pass. You don’t want just full replication because then you aren’t load balancing your traversals across machines. Even if you have a replicated graph database, you will want to create logical partitions so that traversers will be forced to move between machines. When its worth doing that or when you should just use standard iterator Gremlin execution is a fine line… how much data will your traversal touch?

Marko.

Stephen Mallette

unread,
Dec 19, 2016, 8:09:00 AM12/19/16
to Gremlin-users, d...@tinkerpop.apache.org
Gremlin Server should probably start the akka cluster (of course, i still don't have a solid understanding of the clustering capabilities of akka just yet). .I also wonder if there is any value to Gremlin Server embedding akka so it could piggyback on some of the features akka has (e.g. perhaps gremlin server instances could be aware of each other which might yield some interesting features).

Thus, I don’t think GremlinServer really needs to come into play.

Unless i'm missing something, I'm not sure we should say it quite that way - that's a bit more jvm-centric sounding. So as not to be confused, non-jvm GLVs would still require Gremlin Server, right?

--
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-users+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gremlin-users/D2CDD477-4671-4100-ACBB-D0196E9BEB41%40gmail.com.

Marko Rodriguez

unread,
Dec 19, 2016, 12:02:12 PM12/19/16
to gremli...@googlegroups.com, d...@tinkerpop.apache.org
Hi,

Thus, I don’t think GremlinServer really needs to come into play.

Unless i'm missing something, I'm not sure we should say it quite that way - that's a bit more jvm-centric sounding. So as not to be confused, non-jvm GLVs would still require Gremlin Server, right?

I think it gets back to “anything that supports RemoteConnection” to accept the traversal (RemoteStrategy) and then "anything that implements GraphActors” to execute the traversal (ActorProgramStrategy).

Marko.




On Thu, Dec 15, 2016 at 11:47 AM, Marko Rodriguez <okram...@gmail.com> wrote:
Hi,

How will this get deployed? Each database instance (alternatively gremlin-server) shipping a version of akka-actor and akka-cluster?

This is a good question. As I’m seeing it lately, I think we treat it just like spark-gremlin/. That is, lets assume a multi-machine graph database:

1. User has a graph database across 3 nodes in a cluster.
2. User has Akka Cluster setup on those 3 nodes. (like they would have SparkServer or Hadoop).
3. akka-gremlin/ “jobs” have a configuration with information about the Akka cluster and the graph database partitions.

Thus, I don’t think GremlinServer really needs to come into play. However, I sort of think that down the line, GremlinServer should support the spawning of “services.” For instance, it would be great if GremlinServer, when deployed, it could spawn a SparkServer cluster or an Akka Cluster… This removes the headache for users having to install and configure stuff. It would be great if GremlinServer was like a Docker or something.

bin/gremlin-server.sh —i akka.gremlin.plugin —c akka.properties

Dunno. Stephen would have more to say.

What does it mean for performance? Here's my understanding... thoughts?

1. A sharded graph database: as long as the data is local it'll scale linearly, then it needs some synchronisation (i.e. hand off the traversal to the instance where the data is local again). I.e. there'll be a sweet spot of replication vs. shards for each use case.
2. A replicated graph database: should scale linearly for most traversals
3. A single machine graph database: should scale linearly for most traversals

So there will be traverser migration when a traverser no longer references data in its current partition. That is a message pass. You don’t want just full replication because then you aren’t load balancing your traversals across machines. Even if you have a replicated graph database, you will want to create logical partitions so that traversers will be forced to move between machines. When its worth doing that or when you should just use standard iterator Gremlin execution is a fine line… how much data will your traversal touch?

Marko.


--
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-users+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gremlin-users/D2CDD477-4671-4100-ACBB-D0196E9BEB41%40gmail.com.

For more options, visit https://groups.google.com/d/optout.


--
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-user...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gremlin-users/CAA-H43-j2sNSEqYkc9pgkZt8HVoOL8FWZs%3DLtkm_PahrJK75Qg%40mail.gmail.com.

ewohls...@elementum.com

unread,
Jun 7, 2017, 1:58:32 AM6/7/17
to Gremlin-users, d...@tinkerpop.apache.org
Hi Marko, all,
 I was wondering where things left off for this work. Mostly because distributed Gremlin OLTP sounded promising but also because we use both DSE Graph and Akka at my work. 

I noticed in Git that it hasn't been worked on lately. Did you hit a wall with this approach? If so, can anyone share what issues you faced? Do you think there might be a new effort for distributed OLTP or did the OLAP engine improve to the point where distributed OLTP isn't necessary?

Thanks for any info,

--Eric

On Monday, December 19, 2016 at 9:02:12 AM UTC-8, Marko A. Rodriguez wrote:
Hi,

Thus, I don’t think GremlinServer really needs to come into play.

Unless i'm missing something, I'm not sure we should say it quite that way - that's a bit more jvm-centric sounding. So as not to be confused, non-jvm GLVs would still require Gremlin Server, right?

I think it gets back to “anything that supports RemoteConnection” to accept the traversal (RemoteStrategy) and then "anything that implements GraphActors” to execute the traversal (ActorProgramStrategy).

Marko.



On Thu, Dec 15, 2016 at 11:47 AM, Marko Rodriguez <okram...@gmail.com> wrote:
Hi,

How will this get deployed? Each database instance (alternatively gremlin-server) shipping a version of akka-actor and akka-cluster?

This is a good question. As I’m seeing it lately, I think we treat it just like spark-gremlin/. That is, lets assume a multi-machine graph database:

1. User has a graph database across 3 nodes in a cluster.
2. User has Akka Cluster setup on those 3 nodes. (like they would have SparkServer or Hadoop).
3. akka-gremlin/ “jobs” have a configuration with information about the Akka cluster and the graph database partitions.

Thus, I don’t think GremlinServer really needs to come into play. However, I sort of think that down the line, GremlinServer should support the spawning of “services.” For instance, it would be great if GremlinServer, when deployed, it could spawn a SparkServer cluster or an Akka Cluster… This removes the headache for users having to install and configure stuff. It would be great if GremlinServer was like a Docker or something.

bin/gremlin-server.sh —i akka.gremlin.plugin —c akka.properties

Dunno. Stephen would have more to say.

What does it mean for performance? Here's my understanding... thoughts?

1. A sharded graph database: as long as the data is local it'll scale linearly, then it needs some synchronisation (i.e. hand off the traversal to the instance where the data is local again). I.e. there'll be a sweet spot of replication vs. shards for each use case.
2. A replicated graph database: should scale linearly for most traversals
3. A single machine graph database: should scale linearly for most traversals

So there will be traverser migration when a traverser no longer references data in its current partition. That is a message pass. You don’t want just full replication because then you aren’t load balancing your traversals across machines. Even if you have a replicated graph database, you will want to create logical partitions so that traversers will be forced to move between machines. When its worth doing that or when you should just use standard iterator Gremlin execution is a fine line… how much data will your traversal touch?

Marko.


--
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-user...@googlegroups.com.

Marko Rodriguez

unread,
Jun 7, 2017, 9:42:46 AM6/7/17
to gremli...@googlegroups.com, d...@tinkerpop.apache.org
Hello,

 I was wondering where things left off for this work. Mostly because distributed Gremlin OLTP sounded promising but also because we use both DSE Graph and Akka at my work. 

It still is very promising work.

I noticed in Git that it hasn't been worked on lately. Did you hit a wall with this approach? If so, can anyone share what issues you faced? Do you think there might be a new effort for distributed OLTP or did the OLAP engine improve to the point where distributed OLTP isn't necessary?

I just haven’t coded in a long time. I’ve been working on other things lately (primarily research). As such, I have put this body of work on the back burner. I suspect it will ultimately get its debut in TinkerPop4.

Thanks,
Marko.


Legal Disclaimer: The information contained in this message may be privileged and confidential. It is intended to be read only by the individual or entity to whom it is addressed or by their designee. If the reader of this message is not the intended recipient, you are on notice that any distribution of this message, in any form, is strictly prohibited. If you have received this message in error, please immediately notify the sender and delete or destroy any copy of this message


--
You received this message because you are subscribed to the Google Groups "Gremlin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gremlin-user...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages