Timing Issue with gremlin driver java client

76 views
Skip to first unread message

Sudha Subramanian

unread,
Jul 24, 2016, 10:50:56 AM7/24/16
to Aurelius
Hi,

I'm using gremlin java client to execute queries on server. I have following 3 methods that gets called one after the other. 

I seem to have a timing issue. If I call 'get_or_create_user', followed by 'get_or_create_device', and 'addOwnerEdge', the owner edge never gets created. However, after get_or_create_user  and get_or_create_device method, if I retrieve the retuned user and device object and print it, the 'addOwnerEdge' works fine. I've provided the logic I use for printing and invoking the method. 


public void storeOrUpdate() throws Exception {

   Map<String, Object> param = new HashMap<String, Object>();

   param.put("userName", userName);

   param.put("email", email);

   param.put("createdTime", createdTime);

   ResultSet rs = RexsterClientUtil.getInstance().getClient().submit("get_or_create_user(g, userName, email, createdTime)", param);

   Result result = rs.one();

   DetachedVertex dv = (DetachedVertex) result.getObject();

   Iterator<VertexProperty<Object>> queryIter = dv.properties();

   while( queryIter.hasNext()){

   VertexProperty p = queryIter.next();

                       LOGGER.debug("User object : " + p.key() + ":" + p.value());

       }

}


def get_or_create_user(g, userName, email, createdTime) {

       nv = g.V().has('email', email);

       if( !nv.hasNext())

       {

               ret = g.addV(label,'user', 'userName', userName, 'email', email, 'createdTime', createdTime).iterate();

               g.tx().commit()

       }

       else{

               ret = nv.next();

       }      

       ret = g.V().has('email', email).next()

       ret

}


def get_or_create_device(g, deviceIdentifier, deviceRegistrationStatus, serviceOffering, type, deviceToken){

       nv = g.V().has('deviceIdentifier', deviceIdentifier);

       if( !nv.hasNext())

       {

               ret = g.addV(label, 'device', 'deviceIdentifier', deviceIdentifier, 'deviceRegistrationStatus', deviceRegistrationStatus, 'serviceOffering', serviceOffering, 'type', type, 'deviceToken', deviceToken).iterate();

               g.tx().commit()

       }

       else{

               ret = nv.next();

       }

      ret = g.V().has('deviceIdentifier', deviceIdentifier).next()

      ret

}

def addOwnerEdge(g, email, deviceId, toLabel){

       d = g.V().has("deviceIdentifier",  deviceId).as('x').match(__.as('x').not(__.in(toLabel).has('email', email))).select('x').next()

       e = g.V().has('email', email).next()

       e.addEdge(toLabel, d); null;

       g.tx().commit()

}




Sudha Subramanian

unread,
Jul 24, 2016, 10:54:18 AM7/24/16
to Aurelius
Posted accidentally before I could complete the question. 

If I don't have the print method in storeOrUpdate method, subsequent queries does not seem to find it. Can someone please help? Should I have a single method on the server to create and add an edge in a single transaction? 

Thanks,
Sudha

Stephen Mallette

unread,
Jul 25, 2016, 12:49:09 PM7/25/16
to Aurelius
I think I need more/different information about your problem. How are you constructing the Gremlin Server Cluster/Client instances (i assume we are talking about Gremlin Server and not "Rexster" - your code has something called RexsterClientUtil). 

--
You received this message because you are subscribed to the Google Groups "Aurelius" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aureliusgraph...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/aureliusgraphs/d0ab631c-3315-46e3-bcb5-08ad54a9d82f%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Sudha Subramanian

unread,
Jul 26, 2016, 9:31:56 AM7/26/16
to Aurelius
I'm using Gremlin server now. I've not renamed some of these methods yet.

This is how I construct the Gremlin client instance which is stored inside of a factory. I reuse the same client instance within the application.

GryoMapper mapper = GryoMapper.build().addRegistry(TitanIoRegistry.INSTANCE).create();

cluster = Cluster.build().serializer(new GryoMessageSerializerV1d0(mapper)).create();

client = cluster.connect();

client.init();


 Thanks,
Sudha

Stephen Mallette

unread,
Jul 26, 2016, 9:48:08 AM7/26/16
to Aurelius
Ok - as an aside, I would remove those calls to g.tx().commit() in your server side scripts. Gremlin Server automatically manages the transaction on sessionless requests (which is what you seem to be using given your code that creates the Client). By calling commit() yourself, you are essentially creating two transactions per request and Titan is re-querying the data on the second transaction which is adding some cost to your response time.

I don't think the double commit() is your problem here though. You don't show all of your client-side code but I assume that when you aren't "printing the output" you aren't blocking until the response is received. You might need to do that in this case, because each request is depending on the next and since you are using connection pooling on the client (you didn't configure Client otherwise) i suspect that the three requests are hitting more than one server channel and so the requests may execute out of order at the whim of the executor service on the server. Instead of blocking until you have a response, you might also configure the client-side connection pooling to only use 1 connection. That would then force the requests to a single server side channel and then they should execute in order even if you don't block until you get a response on the client. I think that would work too. 

Sudha Subramanian

unread,
Jul 27, 2016, 10:34:54 AM7/27/16
to Aurelius
Hi Stephen,

Thanks for your detailed response. I will remove the g.tx().commit() in the script. 

This is my complete client side code and I thought that 'submit' method on gremlin client is synchronous ( meaning, it will not return till it completes the write). I'm not using the async submit method. 

GryoMapper mapper = GryoMapper.build().addRegistry(TitanIoRegistry.INSTANCE).create();

cluster = Cluster.build().serializer(new GryoMessageSerializerV1d0(mapper)).create();

client = cluster.connect();

client.init();




ResultSet rs = client.submit("get_or_create_user(g, userName, email, createdTime)", param);

Result result = rs.one();

DetachedVertex dv = (DetachedVertex) result.getObject();

Iterator<VertexProperty<Object>> queryIter = dv.properties();

while( queryIter.hasNext()){

  VertexProperty p = queryIter.next();

  LOGGER.debug("User object : " + p.key() + ":" + p.value());

}


 
Thanks,
Sudha

Stephen Mallette

unread,
Jul 27, 2016, 10:46:40 AM7/27/16
to Aurelius
the API for ResultSet is a little confusing. there is a proposal to make it better but it requires a breaking change so hasn't been implemented yet - maybe for TinkerPop 3.3.x. anyway, submit() on it's own doesnt' block in the way you think it does. it blocks only until the write is complete on the client side. then it returns a ResultSet which you would use to block until you get your response from the server. So basically do this:

submit("...").all().get()

to block until that request is complete.

Sudha Subramanian

unread,
Jul 29, 2016, 9:18:20 AM7/29/16
to Aurelius
Thanks Stephen. I will try this out.
Reply all
Reply to author
Forward
0 new messages