Java Thrift Client -- How to set destination_Ids in QueryTerm in call to execute API

112 views
Skip to first unread message

Marcel Becker

unread,
Jun 21, 2012, 2:02:43 PM6/21/12
to flo...@googlegroups.com
Hi, 

I am writing a Java FlockDB client to talk to the scala FlockDB server. 
I have the FockDB up and running. 
After some minor tweaks in the Ruby FlockDB client I can issue commands from the IRB interactive shell and can see the server implements the expected behavior. 

My problem is with the add operation [execute (ExecuteOperationType.add)]. 
More specifically, my question is how to pass the destination ids as a list of numbers to the server using a Java client.  

The IRB client does the right thing. I can pass a list of integers to the add command and I get the desired result. 
I added some print statements to the FlockDB server code and can see the server side code processes the list correctly.   

1.9.3-p194 :001 > require "flockdb"
 => true 
1.9.3-p194 :002 > flock = Flock.new("localhost:7915",{ :graphs => { :follows => 1, :blocks => 2 , :loves => 3, :hates => 4}, :transport_wrapper => Thrift::FramedTransport})
 => #<Flock::Client:0x000000027764e0 @graphs={:follows=>1, :blocks=>2, :loves=>3, :hates=>4}, @service=<Flock::Service(Flock::Edges::FlockDB::Client) @current_server=localhost:7915>> 
1.9.3-p194 :003 > flock.add(1, 2, [2,3,4,5])
 => nil 
1.9.3-p194 :006 > flock.select(1, 2, nil).to_a
 => [3, 5, 2, 4] 
1.9.3-p194 :007 >

I am using the following Java code in my client: 

// Create and open the connection to the server.  
 TTransport transport = new TFramedTransport(new TSocket("localhost", 7915, 10000));
    TProtocol protocol = new TBinaryProtocol(transport);
    FlockDB.Client client = new FlockDB.Client(protocol);
    try {
      transport.open();
    } catch (TException x) {
      x.printStackTrace();
    }

 // Create the query term
 QueryTerm queryTerm = new QueryTerm(1, 1, true);
 // Allocate a ByteBuffer with 4 bytes for an int. 
 ByteBuffer destinationIdsBuffer = ByteBuffer.allocate(4);
 // Use standard putInt api to add an integer to the buffer. 
 destinationIdsBuffer.putInt(5);    
 // set the destination Ids buffer in the query term. 
 queryTerm.setDestination_ids(destinationIdsBuffer);
                
        // Create the necessary objects to wrap the query term. 
        ExecuteOperation exOp = new ExecuteOperation(ExecuteOperationType.Add, queryTerm);
        List<ExecuteOperation> exOperationList = new ArrayList<ExecuteOperation>();
        exOperationList.add(exOp);
        ExecuteOperations exOperations = new ExecuteOperations(exOperationList, Priority.High);

        // Make the call to the thrift API
        try {
          client.execute(exOperations);
        } catch (TException x) {
          x.printStackTrace();
        } catch (FlockException e) {
          e.printStackTrace();
        }

The client code generated by Thrift uses a ByteArray as the class for the destination_Ids field. So I am trying to set the destinationIds of the QueryTerm object using a ByteArray I create in my client code. 

The server, however, does not get the destination_Ids I am passing in the byte array. 
The log below shows the place where the FlockDB server processes the destination ids. 
I can see the server received a non null HeapByteBuffer. The byte buffer position is 69 and its capacity is 80. 
After reading the byte buffer into the WrappedArray, the WrappedArray is empty. 

If I increase the size of the ByteBuffer allocation on the client side to around 20, and add only 5 to the buffer using putInt(5),
then I can see 2 values inside the WrappedArray (see below). But the values are (0,0) and not 5. 
A size of 20 seems to be the threshold -- if the size is less than 20, the server does not see any value. 
If instead of a single value, I tried to add 5 integers to the byte array, then again I get an empty WrappedArray (see below).


I tried several different mechanisms to fill the byte array of the ByteBuffer, but was not able to get the desired result. 
I tried to fill the first 4 bytes of the array directly using different byte ordering (most significant bytes first, least significant bytes first) but it did not chang anything. I was not able to get a single integer value across the wire. 


INF [20120621-10:24:53.228] flockdb: execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]),None),None)),None,High)
INF [20120621-10:24:53.228] flockdb: Call to Execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]),None),None)),None,High)
INF [20120621-10:24:53.229] flockdb: EdgesService: Call to Execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray()),List()),None)),None,High)
INF [20120621-10:24:53.229] queries: ExecuteCompiler.apply ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray()),List()),None)),None,High) Program.size = 1
INF [20120621-10:24:53.229] queries: ExecuteCompiler.apply op = ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray()),List()),None) Term = QueryTerm(1,1,true,Some(WrappedArray()),List())

INF [20120621-10:24:53.229] queries: ExecuteCompiler.processDestinations source = 1 destinationIds.isDefined true
INF [20120621-10:24:53.229] queries: ExecuteCompiler.processDestinations destinationIds = WrappedArray()

INF [20120621-10:24:53.230] flockdb: wrapRPC rpcName = execute fPromise@1561232967(ivar=Ivar@1687277323(state=Done(Return(()))), cancelled=Ivar@548409026(state=Linked(Ivar@1580755471(state=Waiting(List(),List())))))
INF [20120621-10:24:53.230] flockdb: EdgesService: Call to Execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray()),List()),None)),None,High)
INF [20120621-10:24:53.230] queries: ExecuteCompiler.apply ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray()),List()),None)),None,High) Program.size = 1


Using:  

 ByteBuffer buffer = ByteBuffer.allocate(5*4);
 buffer.putInt(5);    


I get the following log: 


INF [20120621-10:31:35.644] flockdb: execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=85 cap=96]),None),None)),None,High)
INF [20120621-10:31:35.645] flockdb: Call to Execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=85 cap=96]),None),None)),None,High)
INF [20120621-10:31:35.645] flockdb: EdgesService: Call to Execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray(0, 0)),List()),None)),None,High)
INF [20120621-10:31:35.645] queries: ExecuteCompiler.apply ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray(0, 0)),List()),None)),None,High) Program.size = 1
INF [20120621-10:31:35.645] queries: ExecuteCompiler.apply op = ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray(0, 0)),List()),None) Term = QueryTerm(1,1,true,Some(WrappedArray(0, 0)),List())

INF [20120621-10:31:35.645] queries: ExecuteCompiler.processDestinations source = 1 destinationIds.isDefined true
INF [20120621-10:31:35.645] queries: ExecuteCompiler.processDestinations destinationIds = WrappedArray(0, 0)


 USING: 
  ByteBuffer buffer = ByteBuffer.allocate(5*4);
        buffer.putInt(5);    
        buffer.putInt(6);
        buffer.putInt(7);
        buffer.putInt(8);
        buffer.putInt(9);

INF [20120621-10:41:54.508] flockdb: execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]),None),None)),None,High)
INF [20120621-10:41:54.509] flockdb: Call to Execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]),None),None)),None,High)
INF [20120621-10:41:54.509] flockdb: EdgesService: Call to Execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray()),List()),None)),None,High)
INF [20120621-10:41:54.509] queries: ExecuteCompiler.apply ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray()),List()),None)),None,High) Program.size = 1
INF [20120621-10:41:54.509] queries: ExecuteCompiler.apply op = ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray()),List()),None) Term = QueryTerm(1,1,true,Some(WrappedArray()),List())
INF [20120621-10:41:54.509] queries: ExecuteCompiler.processDestinations source = 1 destinationIds.isDefined true
INF [20120621-10:41:54.509] queries: ExecuteCompiler.processDestinations destinationIds = WrappedArray()




Thank you, 
Marcel

ksauzz

unread,
Jun 21, 2012, 8:48:21 PM6/21/12
to flo...@googlegroups.com
HI,

I had faced the same issue when I had written a java client.
The solution is to use little endian to talk to the FlockDB as following.

long[] destination_ids = ...;
ByteBuffer buf = ByteBuffer.allocate(8 * destination_ids.length);
buf.order(ByteOrder.LITTLE_ENDIAN);
for (long id : destination_ids) {
buf.putLong(item);
}

2012/6/22 Marcel Becker <beckera...@gmail.com>:

Marcel Becker

unread,
Jun 22, 2012, 1:46:59 PM6/22/12
to flo...@googlegroups.com
Hi, 

Thanks for the answer, but that still does not make it work. 

The weird thing is that the ByteBuffer received by the server has, in my example, position=limit=69. (position is the next index to be read, and limit is the latest index it will read). 
Inside FlockDB.unpackLongs, when we "convert" the HeapByteBuffer to a ByteBufferAsLongBuffer, the resulting buffer of longs is empty -- capacity of the long buffer is 0. 
If I rewind the buffer inside FlockDB.unpackLongs before converting to longs, I get a buffer of longs with 8 longs, while on the client side I sent 5 longs. None of the 8 long values retrieved match the numbers I sent. I tried using both BiG and LITTLE ENDIAN order. 


I do not know if this has any effect, but how are setting up your java client? 

I am using: 

    TTransport transport = new TFramedTransport(new TSocket("localhost", 7915, 10000));
    TProtocol protocol = new TBinaryProtocol(transport);
    FlockDB.Client client = new FlockDB.Client(protocol);



INF [20120622-10:03:53.147] flockdb: execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]),None),None)),None,High)
INF [20120622-10:03:53.148] flockdb: Call to Execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]),None),None)),None,High)
INF [20120622-10:03:53.157] flockdb: In FlockDB.unpackLongs Buffer = java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]
INF [20120622-10:03:53.158] flockdb: In FlockDB.unpackLongs Buffer as Long Buffer = java.nio.ByteBufferAsLongBufferL[pos=0 lim=0 cap=0]
INF [20120622-10:03:53.158] flockdb: In FlockDB.unpackLongs Results = [J@30fd981a



Log results when rewining the ByteBuffer inside FlockDB.unpackLongs

execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]),None),None)),None,High)
INF [20120622-10:27:02.828] flockdb: Call to Execute ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]),None),None)),None,High)
INF [20120622-10:27:02.829] flockdb: In FlockDB.unpackLongs Buffer = java.nio.HeapByteBuffer[pos=69 lim=69 cap=80]

INF [20120622-10:27:02.829] flockdb: In FlockDB.unpackLongs Buffer as Long Buffer = java.nio.ByteBufferAsLongBufferL[pos=0 lim=8 cap=8]
 ExecuteCompiler.apply ExecuteOperations(ArrayBuffer(ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray(504403158282273152, 28557020175366245, 4223224363679744, 2252899325316097, 144128386510356481, 65546, 2199157538816, 3097337140478208)),List()),None)),None,High) Program.size = 1
INF [20120622-10:27:02.830] queries: ExecuteCompiler.apply op = ExecuteOperation(Add,QueryTerm(1,1,true,Some(WrappedArray(504403158282273152, 28557020175366245, 4223224363679744, 2252899325316097, 144128386510356481, 65546, 2199157538816, 3097337140478208)),List()),None) Term = QueryTerm(1,1,true,Some(WrappedArray(504403158282273152, 28557020175366245, 4223224363679744, 2252899325316097, 144128386510356481, 65546, 2199157538816, 3097337140478208)),List())
INF [20120622-10:27:02.830] queries: ExecuteCompiler.processDestinations source = 1 destinationIds.isDefined true
INF [20120622-10:27:02.830] queries: ExecuteCompiler.processDestinations destinationIds = WrappedArray(504403158282273152, 28557020175366245, 4223224363679744, 2252899325316097, 144128386510356481, 65546, 2199157538816, 3097337140478208)
INF [20120622-10:27:02.830] queries: ExecuteCompiler.processDestinations source = 1 destination = 504403158282273152
INF [20120622-10:27:02.830] queries: ExecuteCompiler.processDestinations source = 1 destination = 28557020175366245
INF [20120622-10:27:02.830] queries: ExecuteCompiler.processDestinations source = 1 destination = 4223224363679744
INF [20120622-10:27:02.831] queries: ExecuteCompiler.processDestinations source = 1 destination = 2252899325316097
INF [20120622-10:27:02.831] queries: ExecuteCompiler.processDestinations source = 1 destination = 144128386510356481
INF [20120622-10:27:02.831] queries: ExecuteCompiler.processDestinations source = 1 destination = 65546
INF [20120622-10:27:02.831] queries: ExecuteCompiler.processDestinations source = 1 destination = 2199157538816
INF [20120622-10:27:02.831] queries: ExecuteCompiler.processDestinations source = 1 destination = 3097337140478208



On Thursday, June 21, 2012 5:48:21 PM UTC-7, ksauzz wrote:
HI,

I had faced the same issue when I had written a java client.
The solution is to use little endian to talk to the FlockDB as following.

long[] destination_ids = ...;
ByteBuffer buf = ByteBuffer.allocate(8 * destination_ids.length);
buf.order(ByteOrder.LITTLE_ENDIAN);
for (long id : destination_ids) {
  buf.putLong(item);
}

Reply all
Reply to author
Forward
0 new messages