I need "Admin Tools"

72 views
Skip to first unread message

raul villalba

unread,
May 30, 2011, 5:00:24 AM5/30/11
to project-voldemort
Hi,

I am newbie with Voldemort, I need some tools to controle voldemort´s
objects.
My objetives are methods to:

get a list with all keys from a node.
get a list with all keys from all nodes.
get a list with filter keys from a node. I filtered the starts of
keys, all my keys have a prefix.
get a list with filter keys from all nodes.
delete all keys from a node.
delete all keys from all nodes.
delete with prefix filter keys from a node.
delete with prefix filter keys from all nodes.
migrate all objects from voldemort A to voldemort B. Using my
getterAllKeys for example.
compare all objects from voldemort A to voldemort B.

At this point, I have some of this methods with some problems:

get a list with all keys from a node. I return List<String> with all
keys correctly but I need a lot of memory when I return 10 millions of
keys. I need some ideas to this problem.
get a list with all keys from all nodes. The problem is the same.
get a list with filter keys from a node. Is ok.
get a list with filter keys from all nodes. Not yet.
delete all keys from a node. Is ok.
delete all keys from all nodes. Not yet.
delete with prefix filter keys from a node. Is ok.
delete with prefix filter keys from all nodes. Not yet.
migrate all objects from voldemort A to voldemort B. Not yet.
compare all objects from voldemort A to voldemort B. Not yet.

I want upload my code but I do not know where I can do it.

Thanks and good day!
Message has been deleted

raul villalba

unread,
Jun 8, 2011, 11:37:17 AM6/8/11
to project-voldemort
Good day,

I write a tools class to find objects in voldemort and massive
operations.

Bye

package com.components.repository;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.cluster.Node;
import voldemort.store.socket.SocketAndStreams;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
import voldemort.utils.ByteUtils;

import com.components.util.Config;

public class VoldemortAdmin {
public static final int DEFAULT_MAX_CONNECTIONS_PER_NODE = 10;
public static final int DEFAULT_MAX_CONNECTIONS = 50;
public static final int DEFAULT_SOCKET_BUFFER_SIZE = 32 * 1024;
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 1000000;
public static final int DEFAULT_SO_TIMEOUT_MS = 1000000;
private SocketPool pool;
private AdminClient adminClient;
private List<String> storeNames;
private VoldemortClient voldemortClient;
private List<Integer> partitionIdList;

public VoldemortAdmin() {
super();
init();
}

private void init() {
setVoldemortClient(new VoldemortClient());
adminClient = new
AdminClient(Config.getInstance().getDataRepositoryUrl(), new
AdminClientConfig());
storeNames = new ArrayList<String>();
storeNames.add(Config.getInstance().getDataRepositoryStoreName());
partitionIdList = new ArrayList<Integer>();
partitionIdList.add(0);
partitionIdList.add(1);
setPool(new SocketPool(DEFAULT_MAX_CONNECTIONS_PER_NODE,
DEFAULT_CONNECTION_TIMEOUT_MS, DEFAULT_SO_TIMEOUT_MS,
DEFAULT_SOCKET_BUFFER_SIZE));
}

public VoldemortAdmin(SocketPool poolExtern, List<String>
storeNamesExtern) {
super();
setVoldemortClient(new VoldemortClient());
adminClient = new
AdminClient(Config.getInstance().getDataRepositoryUrl(), new
AdminClientConfig());
partitionIdList = new ArrayList<Integer>();
partitionIdList.add(0);
partitionIdList.add(1);
setPool(poolExtern);
setStoreNames(storeNamesExtern);
}
public VoldemortAdmin(List<String> storeNamesExtern, AdminClient
adminClientExtern) {
super();
setVoldemortClient(new VoldemortClient());
adminClient = adminClientExtern;
partitionIdList = new ArrayList<Integer>();
partitionIdList.add(0);
partitionIdList.add(1);
setPool(new SocketPool(DEFAULT_MAX_CONNECTIONS_PER_NODE,
DEFAULT_CONNECTION_TIMEOUT_MS, DEFAULT_SO_TIMEOUT_MS,
DEFAULT_SOCKET_BUFFER_SIZE));
setStoreNames(storeNamesExtern);
}
public Set<String> getAllKeysByNode(int nodeId) {
List<String> stores = storeNames;
Set<String> keys = new HashSet<String>();
for (String storeName : stores) {
Node node =
adminClient.getAdminClientCluster().getNodeById(nodeId);
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
try {
initiateFetchRequest(outputStream, storeName, partitionIdList);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
keys.add(response.getKey().toStringUtf8());
}
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
return keys;
}

public void deleteAllKeysByNode(int nodeId) {
List<String> stores = storeNames;
for (String storeName : stores) {
Node node =
adminClient.getAdminClientCluster().getNodeById(nodeId);
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
try {
initiateFetchRequest(outputStream, storeName, partitionIdList);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
voldemortClient.delete(response.getKey().toStringUtf8());
}
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
}

public Set<String> getAllKeysByPrefixAndByNode(int nodeId, String
prefix) {
List<String> stores = storeNames;
Set<String> keys = new HashSet<String>();
for (String storeName : stores) {
Node node =
adminClient.getAdminClientCluster().getNodeById(nodeId);
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
try {
initiateFetchRequest(outputStream, storeName, partitionIdList);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
if (response.getKey().toStringUtf8().startsWith(prefix)) {
keys.add(response.getKey().toStringUtf8());
}
}
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
return keys;
}

public void deleteAllKeysByPrefixAndByNode(int nodeId, String prefix)
{
List<String> stores = storeNames;
for (String storeName : stores) {
Node node =
adminClient.getAdminClientCluster().getNodeById(nodeId);
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
try {
initiateFetchRequest(outputStream, storeName, partitionIdList);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
if (response.getKey().toStringUtf8().startsWith(prefix)) {
voldemortClient.delete(response.getKey().toStringUtf8());
}
}
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
}

public Set<String> getAllKeysByNodes(List<Integer> nodesId) {
Set<String> listAll = new HashSet<String>();
for (Integer nodeId : nodesId) {
listAll.addAll(getAllKeysByNode(nodeId));
}
return listAll;
}

private void initiateFetchRequest(DataOutputStream outputStream,
String storeName, List<Integer> partitionList) throws IOException {
VAdminProto.FetchPartitionEntriesRequest.Builder fetchRequest =
VAdminProto.FetchPartitionEntriesRequest.newBuilder().addAllPartitions(partitionList).setStore(storeName);
VAdminProto.VoldemortAdminRequest request =
VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.FETCH_PARTITION_ENTRIES).setFetchPartitionEntries(fetchRequest).build();
ProtoUtils.writeMessage(outputStream, request);
outputStream.flush();
}

private void close(Socket socket) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private VAdminProto.FetchPartitionEntriesResponse
responseFromStream(DataInputStream inputStream, int size) throws
IOException {
byte[] input = new byte[size];
ByteUtils.read(inputStream, input);
VAdminProto.FetchPartitionEntriesResponse.Builder response =
VAdminProto.FetchPartitionEntriesResponse.newBuilder();
response.mergeFrom(input);
return response.build();
}

public void setPool(SocketPool pool) {
this.pool = pool;
}

public SocketPool getPool() {
return pool;
}

public void setAdminClient(AdminClient adminClient) {
this.adminClient = adminClient;
}

public AdminClient getAdminClient() {
return adminClient;
}

public void setPartitionIdList(List<Integer> partitionIdList) {
this.partitionIdList = partitionIdList;
}

public List<Integer> getPartitionIdList() {
return partitionIdList;
}

public List<String> getStoreNames() {
return storeNames;
}

public void setStoreNames(List<String> storeNames) {
this.storeNames = storeNames;
}

public void setVoldemortClient(VoldemortClient voldemortClient) {
this.voldemortClient = voldemortClient;
}

public VoldemortClient getVoldemortClient() {
return voldemortClient;
}

}

OG

unread,
Jun 12, 2011, 7:30:59 AM6/12/11
to project-voldemort
I wonder if it would make sense to include such tools in the V
distribution?

Otis
--
We're hiring: http://sematext.com/about/jobs.html

On Jun 8, 11:37 am, raul villalba <raulvillalbamed...@gmail.com>
wrote:

> ...
>
> read more »

raul villalba

unread,
Jun 12, 2011, 7:42:46 AM6/12/11
to project-voldemort
Now I use this "tools" to develop, in my special case all the keys in
voldemort have a prefix and we have some groups of prefix keys.
This methods help me to test all the operations.
Now I have one store with 100000 keys and this utils work perfectly.
In our production system we have 20 millions of keys and we have
problems because the machine need more space or more memory. If some
people find bugs in my code or possible evolution for my methods,
please send me. This code use the same classes that the voldemort-
admin-tool.sh and I do not know if exist other more good to these
cases.

bye
> ...
>
> read more »

raul villalba

unread,
Jul 4, 2011, 4:41:28 AM7/4/11
to project-voldemort
The new version with import and export to binary file is ok with 10
millions of keys-value.
package com.components.aif2loader.repository;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OptionalDataException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.cluster.Node;
import voldemort.store.socket.SocketAndStreams;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Pair;
import voldemort.versioning.Versioned;

import com.components.aif2loader.util.Config;

public class VoldemortAdmin {
public static final int DEFAULT_MAX_CONNECTIONS_PER_NODE = 10;
public static final int DEFAULT_MAX_CONNECTIONS = 50;
public static final int DEFAULT_SOCKET_BUFFER_SIZE = 32 * 1024;
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 1000000;
public static final int DEFAULT_SO_TIMEOUT_MS = 1000000;

private SocketPool pool;
private AdminClient adminClient;
private List<String> storeNames;
private VoldemortClient voldemortClient;
private List<Integer> partitionIdList;

private String directoryPath = "c://voldemortKeys/";

public VoldemortAdmin() {
super();
init();
}

public VoldemortAdmin(List<String> storeNamesExtern, AdminClient
adminClientExtern) {
super();
setVoldemortClient(new VoldemortClient());
adminClient = adminClientExtern;
setPool(new SocketPool(DEFAULT_MAX_CONNECTIONS_PER_NODE,
DEFAULT_CONNECTION_TIMEOUT_MS, DEFAULT_SO_TIMEOUT_MS,
DEFAULT_SOCKET_BUFFER_SIZE));
setStoreNames(storeNamesExtern);
}

public VoldemortAdmin(SocketPool poolExtern, List<String>
storeNamesExtern) {
super();
setVoldemortClient(new VoldemortClient());
adminClient = new
AdminClient(Config.getInstance().getVoldemortHost().get(0), new
AdminClientConfig());
setPool(poolExtern);
setStoreNames(storeNamesExtern);
}

private void close(Socket socket) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* Delete all keys of all stores
*
*/
public void deleteAllKeys() {
List<String> stores = storeNames;
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, false);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
voldemortClient.delete(response.getKey().toStringUtf8());
}
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
}
}
}

/**
* Delete all keys with a prefix filter.
*
*/
public void deleteAllKeysByPrefix(String prefix) {
List<String> stores = storeNames;
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, false);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
int deletes = 0;
StringBuffer keysDeleted = new StringBuffer();
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
if (response.getKey().toStringUtf8().startsWith(prefix)) {
voldemortClient.delete(response.getKey().toStringUtf8());
// //keysDeleted.append(response.getKey().toStringUtf8());
deletes++;
}
}
}
System.out.println("Objects deleted:" + deletes + " Keys: " +
keysDeleted.toString());
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
}
}
}

/**
* Export all keys to serialized files. No filter
*
*/
public int exportAllKeys() {
long time0 = System.nanoTime();
List<String> stores = storeNames;
File directory = new File(directoryPath);
int total = 0;
if (directory.exists() || directory.mkdir()) {
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
total = total + processGet(partitionId, outputStream, storeName,
sands, destination, inputStream, directory, node);
}
}
}
}
System.out.println("keys: " + total + ", total time: " +
(System.nanoTime() - time0) / 1000000 + " ms.");
return total;
}

/**
* Export all keys to serialized files. Filter by node
*
*/
public int exportAllKeys(Node node) {
long time0 = System.nanoTime();
List<String> stores = storeNames;
File directory = new File(directoryPath);
int total = 0;
for (String storeName : stores) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
total = total + processGet(partitionId, outputStream, storeName,
sands, destination, inputStream, directory, node);
}
}
System.out.println("keys: " + total + ", total time: " +
(System.nanoTime() - time0) / 1000000 + " ms.");
return total;
}

/**
* Export all keys to serialized files. Filter by node and
partitionId
*
*/
public int exportAllKeys(Node node, Integer partitionId) {
long time0 = System.nanoTime();
List<String> stores = storeNames;
File directory = new File(directoryPath);
int total = 0;
for (String storeName : stores) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.add(partitionId);
}
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, true);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
total = total + processGet(partitionId, outputStream, storeName,
sands, destination, inputStream, directory, node);
}
System.out.println("keys: " + total + ", total time: " +
(System.nanoTime() - time0) / 1000000 + " ms.");
return total;
}

/**
* Export all keys to serialized files. Filter by node, partitionId
and
* storeName
*
*/
public int exportAllKeys(Node node, Integer partitionId, String
storeName) {
long time0 = System.nanoTime();
File directory = new File(directoryPath);
int total = 0;
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.add(partitionId);
}
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, true);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
total = total + processGet(partitionId, outputStream, storeName,
sands, destination, inputStream, directory, node);
System.out.println("keys: " + total + ", total time: " +
(System.nanoTime() - time0) / 1000000 + " ms.");
return total;
}

public AdminClient getAdminClient() {
return adminClient;
}

/**
* get a Set of objects with a prefix filter
*
*/
public Set<String> getAllKeysByPrefix(String prefix) {
List<String> stores = storeNames;
Set<String> keys = new HashSet<String>();
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, false);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
if (response.getKey().toStringUtf8().startsWith(prefix)) {
keys.add(response.getKey().toStringUtf8());
}
}
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
}
}
return keys;
}

public Collection<Node> getNodes() {
return adminClient.getAdminClientCluster().getNodes();
}

public List<Integer> getPartitionIdList() {
return partitionIdList;
}

public SocketPool getPool() {
return pool;
}

public List<String> getStoreNames() {
return storeNames;
}

public VoldemortClient getVoldemortClient() {
return voldemortClient;
}

/**
* import All objects by serialized files.
*
*/
public void importAllObjects() {
long time0 = System.nanoTime();
List<String> stores = storeNames;
File directory = new File(directoryPath);

if (directory.exists() || directory.mkdir()) {
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
processSet(partitionId, outputStream, storeName, sands,
destination, inputStream, directory, node);
}
}
}
}
System.out.println("Total time: " + (System.nanoTime() - time0) /
1000000 + " ms.");
}

private void init() {
setVoldemortClient(new VoldemortClient());
adminClient = new
AdminClient(Config.getInstance().getVoldemortHost().get(0), new
AdminClientConfig());
storeNames = new ArrayList<String>();
partitionIdList = new ArrayList<Integer>();
storeNames.add(Config.getInstance().getVoldemortStore());
setPool(new SocketPool(DEFAULT_MAX_CONNECTIONS_PER_NODE,
DEFAULT_CONNECTION_TIMEOUT_MS, DEFAULT_SO_TIMEOUT_MS,
DEFAULT_SOCKET_BUFFER_SIZE));
}

private void initiateFetchRequest(DataOutputStream outputStream,
String storeName, List<Integer> partitionList, boolean fetchValues)
throws IOException {
VAdminProto.FetchPartitionEntriesRequest.Builder fetchRequest =
VAdminProto.FetchPartitionEntriesRequest.newBuilder().addAllPartitions(partitionList).setFetchValues(fetchValues).setStore(storeName);
VAdminProto.VoldemortAdminRequest request =
VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.FETCH_PARTITION_ENTRIES).setFetchPartitionEntries(fetchRequest).build();
ProtoUtils.writeMessage(outputStream, request);
outputStream.flush();
}

/**
* Read from voldemort and set in serialized files.
*
*/
private int processGet(Integer partitionId, DataOutputStream
outputStream, String storeName, SocketAndStreams sands,
SocketDestination destination, DataInputStream inputStream, File
directory, Node node) {
Set<String> keys = new HashSet<String>();
Set<Pair<ByteArray, Versioned<byte[]>>> entries = new
HashSet<Pair<ByteArray, Versioned<byte[]>>>();
int files = 0;
int total = 0;
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, true);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
keys.add(response.getKey().toStringUtf8());
VAdminProto.PartitionEntry partitionEntry =
response.getPartitionEntry();

entries.add(Pair.create(ProtoUtils.decodeBytes(partitionEntry.getKey()),
ProtoUtils.decodeVersioned(partitionEntry.getVersioned())));
if (keys.size() == 10000) {
File outputFileEntries = new File(directory.getPath() + "\\" +
storeName + "-" + node.getId() + "-" + partitionId + "-" + files +
".entries");
writeEntriesBinary(entries, outputFileEntries);
total = total + entries.size();
files++;
keys.clear();
entries.clear();
}
}
}
if (keys.size() != 0) {
File outputFileEntries = new File(directory.getPath() + "\\" +
storeName + "-" + node.getId() + "-" + partitionId + "-" + files +
".entries");
writeEntriesBinary(entries, outputFileEntries);
total = total + entries.size();
files++;
keys.clear();
entries.clear();
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
return total;
}

/**
* Read from a file and set in voldemort
*
*/
private void processSet(Integer partitionId, DataOutputStream
outputStream, String storeName, SocketAndStreams sands,
SocketDestination destination, DataInputStream inputStream, File
directory, Node node) {
File[] files = directory.listFiles();
for (int i = 0; i < files.length; i++) {
Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator;
try {
iterator = readEntriesBinary(directory, files[i]);
adminClient.updateEntries(node.getId(), storeName, iterator,
null);
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* read Pair objects from binary files
*
*/
@SuppressWarnings("unchecked")
private Iterator<Pair<ByteArray, Versioned<byte[]>>>
readEntriesBinary(File inputDir, File inputFile) throws IOException {
if (!inputFile.exists()) {
throw new FileNotFoundException("File " +
inputFile.getAbsolutePath() + " does not exist!");
}
ObjectInputStream dis = new ObjectInputStream(new
FileInputStream(inputFile));
List<Pair<ByteArray, Versioned<byte[]>>> list = new
ArrayList<Pair<ByteArray, Versioned<byte[]>>>();
try {
Object aux = dis.readObject();
while (true) {
try {
if (aux instanceof Pair) {
list.add((Pair<ByteArray, Versioned<byte[]>>) aux);
}
aux = dis.readObject();
} catch (EOFException eof) {
System.out.println("eof encountered" + eof.getMessage());
break;
} catch (OptionalDataException ode) {
System.out.println("OptionalDataException" + ode.getMessage());
} catch (IOException ioe) {
System.out.println("IOException on read object");
System.out.println(ioe.getMessage());
System.out.println(ioe.toString());
} catch (ClassNotFoundException cnf) {
System.out.println("ClassNotFoundException");
}
}
} catch (ClassNotFoundException e) {
try {
dis.close();
} catch (IOException ie) {
ie.printStackTrace();
}
} finally {
dis.close();
}
return list.iterator();
}

private VAdminProto.FetchPartitionEntriesResponse
responseFromStream(DataInputStream inputStream, int size) throws
IOException {
byte[] input = new byte[size];
ByteUtils.read(inputStream, input);
VAdminProto.FetchPartitionEntriesResponse.Builder response =
VAdminProto.FetchPartitionEntriesResponse.newBuilder();
response.mergeFrom(input);
return response.build();
}

public void setAdminClient(AdminClient adminClient) {
this.adminClient = adminClient;
}

public void setPartitionIdList(List<Integer> partitionIdList) {
this.partitionIdList = partitionIdList;
}

public void setPool(SocketPool pool) {
this.pool = pool;
}

public void setStoreNames(List<String> storeNames) {
this.storeNames = storeNames;
}

public void setVoldemortClient(VoldemortClient voldemortClient) {
this.voldemortClient = voldemortClient;
}

/**
* write Pair objects from binary files
*
*/
private void writeEntriesBinary(Set<Pair<ByteArray,
Versioned<byte[]>>> entries, File outputFile) throws IOException {
ObjectOutputStream dos = new ObjectOutputStream(new
FileOutputStream(outputFile));
try {
for (Pair<ByteArray, Versioned<byte[]>> pair : entries) {
dos.writeObject(pair);
}
System.out.println("writeEntriesBinary:" + entries.size());
} finally {
dos.close();
}
}

}
Reply all
Reply to author
Forward
0 new messages