In the following multi threaded demonstration code I get timeouts and
Insufficient Nodes Exceptions when no node failures have happened.
The
This is has occurred in the past on 0.81 and most recently on 0.90
using voldemort-voldemort-46a0ec8.
Any suggestions as to what I am doing wrong would be appreciated.
Sincerely
Jim
-----------------------8<----------------------------
>8-----------------------------------
package com.jims.ObsTest;
import static org.junit.Assert.*;
import java.lang.String;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.StoreClient;
import voldemort.client.StoreClientFactory;
import voldemort.store.InsufficientOperationalNodesException;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Versioned;
public class ObsTest {
static final int count = 127;
static final String bootstrapUrl = "tcp://localhost:6666";
static final ClientConfig config = new ClientConfig()
.setBootstrapUrls(bootstrapUrl);
static final StoreClientFactory factory = new
SocketStoreClientFactory(
config);
static final String key = "1";
public String myToString(int[] array) {
StringBuilder sb = new StringBuilder();
for (int x : array)
sb.append(x + ",");
return sb.toString();
}
static int[] newIntArray(int num) {
return new int[num];
}
static int[] newIntArray(String sb) {
String sa[] = sb.split(",");
int[] array = new int[sa.length];
int i = 0;
for (String s : sa)
array[i++] = Integer.parseInt(s);
return array;
}
class Worker extends Thread {
final Logger log = Logger.getLogger("testVoldemort");
final StoreClient<String, String> client = factory
.getStoreClient("test");
int num;
public void run() {
for (int i = 0; i < count; i++) {
try {
Versioned<String> value = client.get(key);
int[] di = newIntArray(value.getValue());
di[num]++;
value.setObject(myToString(di));
client.put(key, value);
} catch (ObsoleteVersionException e) {
log.info("Obsolete Version, " + num + ", " + i);
i--; // this retries the operation....
} catch (InsufficientOperationalNodesException e) {
log.info("Insufficient Nodes, " + num + ", " + i);
// the operation seemed to work. Do not need to retry?!?....
}
}
}
Worker(int num) {
this.num = num;
}
}
ObsTest(int numThreads) throws InterruptedException {
StoreClient<String, String> client = factory.getStoreClient("test");
client.delete(key);
String value = myToString(new int[numThreads]);
client.put(key, value);
Worker[] threads = new Worker[numThreads];
final Logger log = Logger.getLogger("testVoldemort");
int i = 0;
log.info("Alloc " + numThreads);
for (Worker t : threads)
threads[i] = new Worker(i++);
log.info("Start " + numThreads);
for (Worker t : threads)
t.start();
log.info("Join " + numThreads);
for (Worker t : threads)
t.join();
value = client.getValue(key);
int[] v = newIntArray(value);
i = 0;
for (Worker t : threads)
assertEquals(count, v[i++]);
}
static public void main(String[] args) throws InterruptedException {
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.INFO);
for (int i = 1; i < 10; i++)
new ObsTest(i);
}
}