The new version with import and export to binary file is ok with 10
millions of keys-value.
package com.components.aif2loader.repository;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OptionalDataException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.protocol.pb.ProtoUtils;
import voldemort.client.protocol.pb.VAdminProto;
import voldemort.cluster.Node;
import voldemort.store.socket.SocketAndStreams;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.SocketPool;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
import voldemort.utils.Pair;
import voldemort.versioning.Versioned;
import com.components.aif2loader.util.Config;
public class VoldemortAdmin {
public static final int DEFAULT_MAX_CONNECTIONS_PER_NODE = 10;
public static final int DEFAULT_MAX_CONNECTIONS = 50;
public static final int DEFAULT_SOCKET_BUFFER_SIZE = 32 * 1024;
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 1000000;
public static final int DEFAULT_SO_TIMEOUT_MS = 1000000;
private SocketPool pool;
private AdminClient adminClient;
private List<String> storeNames;
private VoldemortClient voldemortClient;
private List<Integer> partitionIdList;
private String directoryPath = "c://voldemortKeys/";
public VoldemortAdmin() {
super();
init();
}
public VoldemortAdmin(List<String> storeNamesExtern, AdminClient
adminClientExtern) {
super();
setVoldemortClient(new VoldemortClient());
adminClient = adminClientExtern;
setPool(new SocketPool(DEFAULT_MAX_CONNECTIONS_PER_NODE,
DEFAULT_CONNECTION_TIMEOUT_MS, DEFAULT_SO_TIMEOUT_MS,
DEFAULT_SOCKET_BUFFER_SIZE));
setStoreNames(storeNamesExtern);
}
public VoldemortAdmin(SocketPool poolExtern, List<String>
storeNamesExtern) {
super();
setVoldemortClient(new VoldemortClient());
adminClient = new
AdminClient(Config.getInstance().getVoldemortHost().get(0), new
AdminClientConfig());
setPool(poolExtern);
setStoreNames(storeNamesExtern);
}
private void close(Socket socket) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Delete all keys of all stores
*
*/
public void deleteAllKeys() {
List<String> stores = storeNames;
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, false);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
voldemortClient.delete(response.getKey().toStringUtf8());
}
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
}
}
}
/**
* Delete all keys with a prefix filter.
*
*/
public void deleteAllKeysByPrefix(String prefix) {
List<String> stores = storeNames;
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, false);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
int deletes = 0;
StringBuffer keysDeleted = new StringBuffer();
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
if (response.getKey().toStringUtf8().startsWith(prefix)) {
voldemortClient.delete(response.getKey().toStringUtf8());
// //keysDeleted.append(response.getKey().toStringUtf8());
deletes++;
}
}
}
System.out.println("Objects deleted:" + deletes + " Keys: " +
keysDeleted.toString());
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
}
}
}
/**
* Export all keys to serialized files. No filter
*
*/
public int exportAllKeys() {
long time0 = System.nanoTime();
List<String> stores = storeNames;
File directory = new File(directoryPath);
int total = 0;
if (directory.exists() || directory.mkdir()) {
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
total = total + processGet(partitionId, outputStream, storeName,
sands, destination, inputStream, directory, node);
}
}
}
}
System.out.println("keys: " + total + ", total time: " +
(System.nanoTime() - time0) / 1000000 + " ms.");
return total;
}
/**
* Export all keys to serialized files. Filter by node
*
*/
public int exportAllKeys(Node node) {
long time0 = System.nanoTime();
List<String> stores = storeNames;
File directory = new File(directoryPath);
int total = 0;
for (String storeName : stores) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
total = total + processGet(partitionId, outputStream, storeName,
sands, destination, inputStream, directory, node);
}
}
System.out.println("keys: " + total + ", total time: " +
(System.nanoTime() - time0) / 1000000 + " ms.");
return total;
}
/**
* Export all keys to serialized files. Filter by node and
partitionId
*
*/
public int exportAllKeys(Node node, Integer partitionId) {
long time0 = System.nanoTime();
List<String> stores = storeNames;
File directory = new File(directoryPath);
int total = 0;
for (String storeName : stores) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.add(partitionId);
}
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, true);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
total = total + processGet(partitionId, outputStream, storeName,
sands, destination, inputStream, directory, node);
}
System.out.println("keys: " + total + ", total time: " +
(System.nanoTime() - time0) / 1000000 + " ms.");
return total;
}
/**
* Export all keys to serialized files. Filter by node, partitionId
and
* storeName
*
*/
public int exportAllKeys(Node node, Integer partitionId, String
storeName) {
long time0 = System.nanoTime();
File directory = new File(directoryPath);
int total = 0;
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.add(partitionId);
}
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, true);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
total = total + processGet(partitionId, outputStream, storeName,
sands, destination, inputStream, directory, node);
System.out.println("keys: " + total + ", total time: " +
(System.nanoTime() - time0) / 1000000 + " ms.");
return total;
}
public AdminClient getAdminClient() {
return adminClient;
}
/**
* get a Set of objects with a prefix filter
*
*/
public Set<String> getAllKeysByPrefix(String prefix) {
List<String> stores = storeNames;
Set<String> keys = new HashSet<String>();
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, false);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
if (response.getKey().toStringUtf8().startsWith(prefix)) {
keys.add(response.getKey().toStringUtf8());
}
}
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
}
}
}
return keys;
}
public Collection<Node> getNodes() {
return adminClient.getAdminClientCluster().getNodes();
}
public List<Integer> getPartitionIdList() {
return partitionIdList;
}
public SocketPool getPool() {
return pool;
}
public List<String> getStoreNames() {
return storeNames;
}
public VoldemortClient getVoldemortClient() {
return voldemortClient;
}
/**
* import All objects by serialized files.
*
*/
public void importAllObjects() {
long time0 = System.nanoTime();
List<String> stores = storeNames;
File directory = new File(directoryPath);
if (directory.exists() || directory.mkdir()) {
for (String storeName : stores) {
Collection<Node> nodes =
adminClient.getAdminClientCluster().getNodes();
for (Node node : nodes) {
final SocketDestination destination = new
SocketDestination(node.getHost(), node.getAdminPort(),
RequestFormatType.ADMIN_PROTOCOL_BUFFERS);
final SocketAndStreams sands = pool.checkout(destination);
DataOutputStream outputStream = sands.getOutputStream();
final DataInputStream inputStream = sands.getInputStream();
if (node.getPartitionIds() != null) {
partitionIdList.clear();
partitionIdList.addAll(node.getPartitionIds());
}
for (Integer partitionId : partitionIdList) {
processSet(partitionId, outputStream, storeName, sands,
destination, inputStream, directory, node);
}
}
}
}
System.out.println("Total time: " + (System.nanoTime() - time0) /
1000000 + " ms.");
}
private void init() {
setVoldemortClient(new VoldemortClient());
adminClient = new
AdminClient(Config.getInstance().getVoldemortHost().get(0), new
AdminClientConfig());
storeNames = new ArrayList<String>();
partitionIdList = new ArrayList<Integer>();
storeNames.add(Config.getInstance().getVoldemortStore());
setPool(new SocketPool(DEFAULT_MAX_CONNECTIONS_PER_NODE,
DEFAULT_CONNECTION_TIMEOUT_MS, DEFAULT_SO_TIMEOUT_MS,
DEFAULT_SOCKET_BUFFER_SIZE));
}
private void initiateFetchRequest(DataOutputStream outputStream,
String storeName, List<Integer> partitionList, boolean fetchValues)
throws IOException {
VAdminProto.FetchPartitionEntriesRequest.Builder fetchRequest =
VAdminProto.FetchPartitionEntriesRequest.newBuilder().addAllPartitions(partitionList).setFetchValues(fetchValues).setStore(storeName);
VAdminProto.VoldemortAdminRequest request =
VAdminProto.VoldemortAdminRequest.newBuilder().setType(VAdminProto.AdminRequestType.FETCH_PARTITION_ENTRIES).setFetchPartitionEntries(fetchRequest).build();
ProtoUtils.writeMessage(outputStream, request);
outputStream.flush();
}
/**
* Read from voldemort and set in serialized files.
*
*/
private int processGet(Integer partitionId, DataOutputStream
outputStream, String storeName, SocketAndStreams sands,
SocketDestination destination, DataInputStream inputStream, File
directory, Node node) {
Set<String> keys = new HashSet<String>();
Set<Pair<ByteArray, Versioned<byte[]>>> entries = new
HashSet<Pair<ByteArray, Versioned<byte[]>>>();
int files = 0;
int total = 0;
try {
List<Integer> subList = new ArrayList<Integer>();
subList.add(partitionId);
initiateFetchRequest(outputStream, storeName, subList, true);
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
try {
while (true) {
int size = inputStream.readInt();
if (size == -1) {
pool.checkin(destination, sands);
break;
}
VAdminProto.FetchPartitionEntriesResponse response =
responseFromStream(inputStream, size);
if (response.hasError()) {
pool.checkin(destination, sands);
System.out.println(response.getError().getErrorMessage());
} else {
keys.add(response.getKey().toStringUtf8());
VAdminProto.PartitionEntry partitionEntry =
response.getPartitionEntry();
entries.add(Pair.create(ProtoUtils.decodeBytes(partitionEntry.getKey()),
ProtoUtils.decodeVersioned(partitionEntry.getVersioned())));
if (keys.size() == 10000) {
File outputFileEntries = new File(directory.getPath() + "\\" +
storeName + "-" + node.getId() + "-" + partitionId + "-" + files +
".entries");
writeEntriesBinary(entries, outputFileEntries);
total = total + entries.size();
files++;
keys.clear();
entries.clear();
}
}
}
if (keys.size() != 0) {
File outputFileEntries = new File(directory.getPath() + "\\" +
storeName + "-" + node.getId() + "-" + partitionId + "-" + files +
".entries");
writeEntriesBinary(entries, outputFileEntries);
total = total + entries.size();
files++;
keys.clear();
entries.clear();
}
} catch (IOException e) {
close(sands.getSocket());
pool.checkin(destination, sands);
throw new VoldemortException(e);
}
return total;
}
/**
* Read from a file and set in voldemort
*
*/
private void processSet(Integer partitionId, DataOutputStream
outputStream, String storeName, SocketAndStreams sands,
SocketDestination destination, DataInputStream inputStream, File
directory, Node node) {
File[] files = directory.listFiles();
for (int i = 0; i < files.length; i++) {
Iterator<Pair<ByteArray, Versioned<byte[]>>> iterator;
try {
iterator = readEntriesBinary(directory, files[i]);
adminClient.updateEntries(node.getId(), storeName, iterator,
null);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* read Pair objects from binary files
*
*/
@SuppressWarnings("unchecked")
private Iterator<Pair<ByteArray, Versioned<byte[]>>>
readEntriesBinary(File inputDir, File inputFile) throws IOException {
if (!inputFile.exists()) {
throw new FileNotFoundException("File " +
inputFile.getAbsolutePath() + " does not exist!");
}
ObjectInputStream dis = new ObjectInputStream(new
FileInputStream(inputFile));
List<Pair<ByteArray, Versioned<byte[]>>> list = new
ArrayList<Pair<ByteArray, Versioned<byte[]>>>();
try {
Object aux = dis.readObject();
while (true) {
try {
if (aux instanceof Pair) {
list.add((Pair<ByteArray, Versioned<byte[]>>) aux);
}
aux = dis.readObject();
} catch (EOFException eof) {
System.out.println("eof encountered" + eof.getMessage());
break;
} catch (OptionalDataException ode) {
System.out.println("OptionalDataException" + ode.getMessage());
} catch (IOException ioe) {
System.out.println("IOException on read object");
System.out.println(ioe.getMessage());
System.out.println(ioe.toString());
} catch (ClassNotFoundException cnf) {
System.out.println("ClassNotFoundException");
}
}
} catch (ClassNotFoundException e) {
try {
dis.close();
} catch (IOException ie) {
ie.printStackTrace();
}
} finally {
dis.close();
}
return list.iterator();
}
private VAdminProto.FetchPartitionEntriesResponse
responseFromStream(DataInputStream inputStream, int size) throws
IOException {
byte[] input = new byte[size];
ByteUtils.read(inputStream, input);
VAdminProto.FetchPartitionEntriesResponse.Builder response =
VAdminProto.FetchPartitionEntriesResponse.newBuilder();
response.mergeFrom(input);
return response.build();
}
public void setAdminClient(AdminClient adminClient) {
this.adminClient = adminClient;
}
public void setPartitionIdList(List<Integer> partitionIdList) {
this.partitionIdList = partitionIdList;
}
public void setPool(SocketPool pool) {
this.pool = pool;
}
public void setStoreNames(List<String> storeNames) {
this.storeNames = storeNames;
}
public void setVoldemortClient(VoldemortClient voldemortClient) {
this.voldemortClient = voldemortClient;
}
/**
* write Pair objects from binary files
*
*/
private void writeEntriesBinary(Set<Pair<ByteArray,
Versioned<byte[]>>> entries, File outputFile) throws IOException {
ObjectOutputStream dos = new ObjectOutputStream(new
FileOutputStream(outputFile));
try {
for (Pair<ByteArray, Versioned<byte[]>> pair : entries) {
dos.writeObject(pair);
}
System.out.println("writeEntriesBinary:" + entries.size());
} finally {
dos.close();
}
}
}