Couchbase Sqoop Data Locality question

36 views
Skip to first unread message

Corey Nolet

unread,
Mar 13, 2014, 12:57:29 AM3/13/14
to couc...@googlegroups.com
Hello,

I'm looking through the source code on github for the couchbase hadoop connector. If I'm understanding correctly, the code that generates the splits takes all the possible VBuckets and breaks them up into groups based on the expected number of mappers set by Sqoop. This means that no matter what, even if a mapper is scheduled on a couchbase node, the reads from the dump are ALWAYS going to be sent over the network instead of possibly pulled from the local node's memory and just funneled into the mapper sitting on that local node.

Looking further into the code in the java couchbase client, I'm seeing a class called "VBucketNodeLocator" which has a method getServerByIndex(int k). If I understand this method, it's allowing me to look up the server that holds the vbucket number k. Is this correct?  If it is correct, would it make sense for this to be used in the getSplits() method in the CouchbaseInputFormat so that the splits for the vbuckets can be grouped by the server in which they live? I agree that it may not make sense for many who have their couchbase cluster separate from their hadoop cluster.. but it's a SIGNIFICANT optimization for those who have the two co-located.

Any thoughts?


Thanks!

Corey Nolet

unread,
Mar 13, 2014, 7:42:46 AM3/13/14
to couc...@googlegroups.com
It appears that method only returns the server at some index in the array. Is there not any way to find what server is responsible for a vbucket?

Matt Ingenthron

unread,
Mar 13, 2014, 1:38:39 PM3/13/14
to couc...@googlegroups.com
Hi Corey,

From: Corey Nolet <cjn...@gmail.com>
Reply-To: "couc...@googlegroups.com" <couc...@googlegroups.com>
Date: Wednesday, March 12, 2014 8:57 PM
To: "couc...@googlegroups.com" <couc...@googlegroups.com>
Subject: Couchbase Sqoop Data Locality question

Hello,

I'm looking through the source code on github for the couchbase hadoop connector. If I'm understanding correctly, the code that generates the splits takes all the possible VBuckets and breaks them up into groups based on the expected number of mappers set by Sqoop. This means that no matter what, even if a mapper is scheduled on a couchbase node, the reads from the dump are ALWAYS going to be sent over the network instead of possibly pulled from the local node's memory and just funneled into the mapper sitting on that local node.

This is true, however…

Hadoop is designed to run across a cluster of systems distributed on a network.  Couchbase, similarly is designed to be run distributed across systems.  So while you're describing a possible optimization, it's not something that's part of either one right now.

Actually, the way sqoop runs is ideal for most deployments since it gives us great throughput by splitting/distributing the job.


Looking further into the code in the java couchbase client, I'm seeing a class called "VBucketNodeLocator" which has a method getServerByIndex(int k). If I understand this method, it's allowing me to look up the server that holds the vbucket number k. Is this correct?  If it is correct, would it make sense for this to be used in the getSplits() method in the CouchbaseInputFormat so that the splits for the vbuckets can be grouped by the server in which they live? I agree that it may not make sense for many who have their couchbase cluster separate from their hadoop cluster.. but it's a SIGNIFICANT optimization for those who have the two co-located.

Yes, enhancements to how the splitting is done by using that method (which isn't really considered public) would be an optimization.  If you want to try setting up that optimization, the repo is here:

Our code review is here:

Soon we'll be doing some updates there too.

Thanks,

Matt

-- 
Matt Ingenthron
Couchbase, Inc.

Corey Nolet

unread,
Mar 18, 2014, 11:06:19 PM3/18/14
to couc...@googlegroups.com
Matt,

Sure, your point is definitely valid for moving data from one completely separate distributed system to another. It is definitely not optimal in cases where I am using Couchbase as distributed cache on the same nodes as my Hadoop cluster. In fact, one of the main powers of Hadoop is its ability to maintain knowledge of locality and pass that info down to the map/reduce layer so that mappers can be scheduled on nodes closest to the data. Network is worlds slower than memory- if I can have a mapper on each node just pulling data from it's local Couchbase tap instead of hitting the network at all, then I'm in a much better position.

I'd also say the same for Couchbase proper- if I could hash the data my way so that I can control on which node it ends up, i'd be in a better position with my use of the distributed cache. I want to do streams processing but give my users the abilty to query the cache using elasticsearch. From what I've looked at in the Couchbase Java Client, I can fill in an interface to determine which VBucket a key should end up in but I'd have to recompile the client in order to use my specialized hashing function. I don't mind doing this, but again I have no way to find out which node will host that vbucket. Your hashing solution works when I want to guarantee always an even distribution, but I don't always want to guarantee that (or maybe I know better about what a more useful even distribution may  look like based on my domain's use-cases than Couchbase does based on its auto-sharding).

In my environment, I'm using Couchbase as a mutability layer on top of Hadoop because my data can change quite frequently for a period of time until it's considered immutable and I can vet the data into Accumulo via map/reduce job. For this use case, the Sqoop plugin just adds an extra step of having to write a file in HDFS and then map/reduce over the file- to put the data somewhere else. It also adds storage overhead. I ripped out the CouchbaseInputFormat from the Sqoop plugin github project. I don't know why the version of the Sqoop plugin that works with CDH3 uses Membase client to perform the TAP but for some reason I could not get that to work in Couchbase 2.x. I changed that to use CouchbaseClient instead of Membase and it works fine. I've now got an InputFormat that's correctly pushing the data directly to Accumulo but it's based entirely on the network. It would definitely benefit from having locality and not wasting precious network bandwidth. I'm not an Erlang developer so I don't think pointing me directly to an Erlang method would be useful to me- though I know in my past experience with Couchbase that some of the methods have been exposed via a remote "eval" function (or maybe Erlang does this automatically?). Is it possible to use that eval to ask Couchbase on which nodes a vbucket is being hosted? It's a function that I'd need to call once during the initialization of the inputformat.


Thanks again Matt!
Reply all
Reply to author
Forward
0 new messages