FULL implementation of distributed Semaphore, CountDownLatch, and re-implementation of AtomicLong

223 views
Skip to first unread message

EZ Comet

unread,
Aug 4, 2011, 12:29:26 AM8/4/11
to Hazelcast
This is to let you know that I uploaded a patch for fully working
properly implemented distributed Semaphore, CountDownLatch, and re-
implemented AtomicLong.

I did everything from A-Z. Client, jmx, spring, configuration,
tests(could always use more), statistics, interface javadocs,
modification conditional backups, Async functions, and everything in
between. I cannot find anything left to do/fix.

I streamlined everything as much as I could. Some tweaks may be
possible, but I doubt much if anything will need improvement.

You can find the patch at:

http://code.google.com/p/hazelcast/issues/detail?id=596

Enjoy!

Tom C

Talip Ozturk

unread,
Aug 4, 2011, 12:56:56 AM8/4/11
to haze...@googlegroups.com
Amazing!!! what did you re-implement AtomicNumber? how does the
new-implementation work?

http://twitter.com/oztalip

> --
> You received this message because you are subscribed to the Google Groups "Hazelcast" group.
> To post to this group, send email to haze...@googlegroups.com.
> To unsubscribe from this group, send email to hazelcast+...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/hazelcast?hl=en.
>
>

EZ Comet

unread,
Aug 4, 2011, 4:34:40 AM8/4/11
to Hazelcast
When I started looking through the code. I noticed immediately that
the Semaphore implementation was based on the AtomicNumber
implementation. Looking at that I could tell that it was not done by
the same people that did the rest of the coding. I had to assume it
was someone else's patch. It was doing things like calculating the new
value twice, once on the member holding the data and then again before
the backup which was always backed up regardless of change. It also
had it's operations in the CMap.java file instead of
ConcurrentMapManager.java where all the other ops are.

ORIGINAL

ConcurrentMapManager.java

class MAtomic extends MBackupAndMigrationAwareOp {
final Data nameAsKey;
final ClusterOperation op;
final long expected;
final long value;
final boolean ignoreExpected;

MAtomic(Data nameAsKey, ClusterOperation op, long value, long
expected, boolean ignoreExpected) {
this.nameAsKey = nameAsKey;
this.op = op;
this.value = value;
this.expected = expected;
this.ignoreExpected = ignoreExpected;
}

MAtomic(Data nameAsKey, ClusterOperation op, long value, long
expected) {
this(nameAsKey, op, value, expected, false);
}

MAtomic(Data nameAsKey, ClusterOperation op, long value) {
this(nameAsKey, op, value, 0, true);
}

boolean doBooleanAtomic() {
Data expectedData = (ignoreExpected) ? null :
toData(expected);
setLocal(op, FactoryImpl.ATOMIC_NUMBER_MAP_NAME,
nameAsKey, expectedData, 0, 0);
request.longValue = value;
request.setBooleanRequest();
doOp();
Object returnObject = getResultAsBoolean();
if (returnObject instanceof AddressAwareException) {
rethrowException(op, (AddressAwareException)
returnObject);
}
return !Boolean.FALSE.equals(returnObject);
}

long doLongAtomic() {
setLocal(op, FactoryImpl.ATOMIC_NUMBER_MAP_NAME,
nameAsKey, null, 0, 0);
request.longValue = value;
doOp();
Object returnObject = getResultAsObject(false);
return (Long) returnObject;
}

void backup(Long value) {
request.value = toData(value);
backup(CONCURRENT_MAP_BACKUP_PUT);
}
}

CMap.java

public void doAtomic(Request req) {
Record record = getRecord(req);
if (record == null) {
record = createNewRecord(req.key, toData(0L));
mapRecords.put(req.key, record);
} else if (record.getValue() == null) {
record.setValue(toData(0L));
}
if (req.operation == ATOMIC_NUMBER_GET_AND_SET) {
req.response = record.getValueData();
record.setValue(toData(req.longValue));
} else if (req.operation == ATOMIC_NUMBER_ADD_AND_GET) {
record.setValue(IOUtil.addDelta(record.getValueData(),
req.longValue));
req.response = record.getValueData();
} else if (req.operation == ATOMIC_NUMBER_GET_AND_ADD) {
req.response = record.getValueData();
record.setValue(IOUtil.addDelta(record.getValueData(),
req.longValue));
} else if (req.operation == ATOMIC_NUMBER_COMPARE_AND_SET) {
if (record.getValueData().equals(req.value)) {
record.setValue(toData(req.longValue));
req.response = Boolean.TRUE;
} else {
req.response = Boolean.FALSE;
}
req.value = null;
}
// Update request's version to record's.
// It is required on backupOneValue version check.
req.version = record.getVersion();
}


CHANGED:

ConcurrentMapManager.java

abstract class AtomicLongOperationHandler extends
MTargetAwareOperationHandler {
abstract long getNewValue(long oldValue, long value);

abstract long getResponseValue(long oldValue, long value);

@Override
void doOperation(Request request) {
final Record record = ensureRecord(request,
AtomicLongProxy.DATA_LONG_ZERO);
final Data oldValueData = record.getValueData();
final Data expectedValue = request.value;
final long value = request.longValue;
request.clearForResponse();
if (expectedValue == null ||
expectedValue.equals(oldValueData)) {
final long oldValue = (Long) toObject(oldValueData);
final long newValue = getNewValue(oldValue, value);
request.longValue = getResponseValue(oldValue, value);
if (oldValue != newValue) {
record.setValue(toData(newValue));
record.incrementVersion();
request.version = record.getVersion();
request.response = record.getValueData();
}
} else {
request.longValue = 0L;
}
}
}

class AtomicLongAddAndGetOperationHandler extends
AtomicLongOperationHandler {
long getNewValue(long oldValue, long value) {
return oldValue + value;
}

long getResponseValue(long oldValue, long value) {
return oldValue + value;
}
}

class AtomicLongGetAndAddOperationHandler extends
AtomicLongOperationHandler {
long getNewValue(long oldValue, long value) {
return oldValue + value;
}

long getResponseValue(long oldValue, long value) {
return oldValue;
}
}

class AtomicLongGetAndSetOperationHandler extends
AtomicLongOperationHandler {
long getNewValue(long oldValue, long value) {
return value;
}

long getResponseValue(long oldValue, long value) {
return oldValue;
}
}

class AtomicLongCompareAndSetOperationHandler extends
AtomicLongOperationHandler {
long getNewValue(long oldValue, long value) {
return value;
}

long getResponseValue(long oldValue, long value) {
return 1L;
}
}

....

class MAtomicLong extends MDefaultBackupAndMigrationAwareOp {
AtomicLongOperationsCounter operationsCounter;

public long addAndGet(Data name, long delta) {
return doAtomicOp(ATOMIC_LONG_ADD_AND_GET, name, delta,
null);
}

public boolean compareAndSet(Data name, long expectedValue,
long newValue) {
return doAtomicOp(ATOMIC_LONG_COMPARE_AND_SET, name,
newValue, toData(expectedValue)) == 1;
}

public long getAndAdd(Data name, long delta) {
return doAtomicOp(ATOMIC_LONG_GET_AND_ADD, name, delta,
null);
}

public long getAndSet(Data name, long newValue) {
return doAtomicOp(ATOMIC_LONG_GET_AND_SET, name, newValue,
null);
}

public void destroy(Data name) {
new MRemove().remove(MapConfig.ATOMIC_LONG_MAP_NAME, name,
-1);
}

void setOperationsCounter(AtomicLongOperationsCounter
operationsCounter) {
this.operationsCounter = operationsCounter;
}

private long doAtomicOp(ClusterOperation op, Data name, long
value, Data expected){
long begin = currentTimeMillis();
setLocal(op, MapConfig.ATOMIC_LONG_MAP_NAME, name,
expected, 0, 0);
request.longValue = value;
doOp();
Data backup = (Data) getResultAsIs();
long responseValue = request.longValue;
if (backup != null) {
request.value = backup;
request.longValue = 0L;
backup(CONCURRENT_MAP_BACKUP_PUT);

operationsCounter.incrementModified(currentTimeMillis() - begin);
} else {

operationsCounter.incrementNonModified(currentTimeMillis() - begin);
}
return responseValue;
}
}

... MDefaultBackupAndMigrationAwareOp is the same as a
MBackupAndMigrationAwareOp but limits the number of backups to the
default map backup value which is what it was doing, just skipped
having to get the value from the backing map.


abstract class MDefaultBackupAndMigrationAwareOp extends
MBackupAndMigrationAwareOp {
@Override
void prepareForBackup() {
backupCount = Math.min(MapConfig.DEFAULT_BACKUP_COUNT,
lsMembers.size() - 1);
}
}


You can see that the backups are now conditional upon a value being
returned from the op. Also added in the local statistics counters
which tracks modified (which do backups) and non-modified ops so
latency of the backup can be seen (should be very similar but why not,
right?). I also overrode the ensureRecord method to allow a default
init value to be specified instead of just the request value and is
used by all three.

Now it fits the other internals and I used this to make the Semaphore
and CountDownLatch which just add a little more complexity.

Anyone who wants to make some other implementation can use these three
as a basis instead of having to try to figure it all out.

Tom
Reply all
Reply to author
Forward
0 new messages