Hi Guys,
I'm currently running into a problem with the IMap.replace method and
I'm sure I have messed up somewhere, but it isn't obvious to me.
It is an example that shows the optimistic locking behavior of the
Hazelcast IMap. This is done by concurrently incrementing a counter.
The problem
is that the example shows that there are lost updates, although the
code should have been protected against it because of the optimistic
locking.
So if someone can have a look at it and point out the error.
example:
import java.io.Serializable;
import java.util.UUID;
public class Counter implements Serializable {
private final String id;
private long version;
private long value;
public Counter(long value) {
this(UUID.randomUUID().toString(), value, 0);
}
public Counter(String id, long value, long version) {
this.value = value;
this.id = id;
this.version = version;
}
public String getId() {
return id;
}
public void inc() {
value++;
}
public long getValue() {
return value;
}
public void incVersion() {
version++;
}
public long getVersion() {
return version;
}
public boolean equals(Object thatObject) {
if (thatObject == this) return true;
if (!(thatObject instanceof Counter)) return false;
Counter that = (Counter) thatObject;
if(!(that.id.equals(
this.id)))return false;
if(that.version!=this.version)return false;
if(that.value!=this.value)return false;
return true;
}
public int hashCode() {
int hash = id.hashCode();
hash = 31*hash + hashCode(version);
hash = 31*hash + hashCode(value);
return hash;
}
static int hashCode(long l){
return ((int)(l ^ (l >>> 32)));
}
}
---------------------------------------------------
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.IMap;
import java.util.concurrent.atomic.AtomicLong;
public class CounterService {
private final IMap<String, Counter> counterMap =
Hazelcast.getMap("counters");
public static final AtomicLong COLLISION_COUNTER = new AtomicLong();
public String create(int value) {
Counter counter = new Counter(value);
counterMap.put(counter.getId(), counter);
return counter.getId();
}
public long count(String id) {
return counterMap.get(id).getValue();
}
public Counter get(String id) {
return counterMap.get(id);
}
public void increment(String counterId) {
for (; ; ) {
Counter oldCounter = counterMap.get(counterId);
if (oldCounter == null) throw new IllegalArgumentException();
Counter newCounter = new Counter(counterId,
oldCounter.getValue(), oldCounter.getVersion());
newCounter.inc();
newCounter.incVersion();
if (counterMap.replace(counterId, oldCounter, newCounter)) {
return;
} else {
COLLISION_COUNTER.incrementAndGet();
}
}
}
}
---------------------------------------------------
public class Main {
public static final int INCREMENT_COUNT_PER_THREAD = 1000 * 1000;
public static void main(String[] args) throws InterruptedException {
CounterService counterService = new CounterService();
String id = counterService.create(0);
Thread[] threads = new Thread[2];
for (int k = 0; k < threads.length; k++)
threads[k] = new IncrementThread(counterService, id);
for (Thread thread : threads) thread.start();
for (Thread thread : threads) thread.join();
long actualCount = counterService.count(id);
long expectedCount = INCREMENT_COUNT_PER_THREAD * threads.length;
System.out.printf("Lost update detected: %s\n", (actualCount
!= expectedCount));
System.out.printf("Expected count: %s, actual count: %s\n",
expectedCount, actualCount);
System.out.printf("Collision count: %s\n",
CounterService.COLLISION_COUNTER);
System.out.printf("Version: %s",counterService.get(id).getVersion());
}
static class IncrementThread extends Thread {
private final CounterService counterService;
private final String counterId;
IncrementThread(CounterService counterService, String counterId) {
this.counterService = counterService;
this.counterId = counterId;
}
public void run() {
for (int k = 0; k < INCREMENT_COUNT_PER_THREAD; k++)
counterService.increment(counterId);
}
}
}