@Test
public void concurrenPutPlainSTM()
{
String key = "1";
Integer value1 = 1;
Integer value2 = 2;
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
CountDownLatch waitTillAllDone = new CountDownLatch(2);
Map<String, Integer> map = scala.concurrent.stm.japi.STM.newMap();
Runnable runnable1 = () -> {
try {
scala.concurrent.stm.japi.STM.atomic(() ->
{
scala.concurrent.stm.japi.STM.afterRollback(() -> {
System.out.println("1 rollback");
throw new RuntimeException();
});
try {
latch1.await();
map.put(key, value1);
latch2.countDown(); // release the latch that blocks the 2nd put
} catch (InterruptedException e) {
System.out.println("InterruptedException in 1");
}
});
} catch (Throwable e) {
System.out.println("1 exc");
}
waitTillAllDone.countDown();
};
Runnable runnable2 = () -> {
try {
scala.concurrent.stm.japi.STM.atomic(() ->
{
scala.concurrent.stm.japi.STM.afterRollback(() -> {
System.out.println("2 rollback");
throw new RuntimeException();
});
try {
latch1.countDown();
latch2.await();
map.put(key, value2);
} catch (InterruptedException e) {
System.out.println("InterruptedException in 2");
}
});
} catch (Throwable e) {
System.out.println("2 exc");
}
waitTillAllDone.countDown();
};
new Thread(runnable1).start();
new Thread(runnable2).start();
try {
waitTillAllDone.await();
} catch (InterruptedException e) {
System.out.println("InterruptedException in waitTillAllDone");
}
atomic(() -> System.out.println("value: " + map.get(key)));
2 rollback
java.lang.Exception: status=RolledBack(OptimisticFailureCause('stale_read,Some(scala.concurrent.stm.ccstm.CCSTMRefs$GenericRef@15bc885e)))
at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:12)
at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:11)
...
value: Some(2)
1 rollback
1 exc
value: Some(2)
--
---
You received this message because you are subscribed to the Google Groups "Scala STM Discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-stm-expert-...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
In the code below I have constructed a case with the use of latches that provokes the atomic block in runnable 1 to loose the race against the atomic block in runnable 2:
--
---
You received this message because you are subscribed to the Google Groups "Scala STM Discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-stm-expert-...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
HiI assume that you are just mixing latches with STM in order to enforce some interesting execution traces. Otherwise I would follow Alexandar's advice and use modular blocking.When I run your example I get:1 rollbackjava.lang.Exception: status=RolledBack(OptimisticFailureCause('steal_by_higher_priority,Some(scala.concurrent.stm.ccstm.CCSTMRefs$GenericRef@54b55e49)))at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:12)at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:11)...value: 1 // No Option in your example since your code is using java.util.Map:)
In the code below I have constructed a case with the use of latches that provokes the atomic block in runnable 1 to loose the race against the atomic block in runnable 2:This is not guaranteed by your code:runnable1 runs to latch1.await() and is blockedrunnable2 runs to latch1.countDown() which enables runnable1 to finish its transactionTo be sure that runnable1 looses the race you have to ensure that runnable2 leaves its atomic block (commits) before runnable1 leaves its atomic block. Let runnable1 wait on a latch which is released by runnable2 after commit (see below).But the main issue is, that throwing an exception within afterRollback does not abort the transaction. The exception is caught, printed [1] and then the transaction is retried. Check the additional System.out.println("Running transaction 1"); in runnable1. It runs twice when the test is executed:
H, it sounds to me like you are trying to disable the automatic retry feature of the STM. There's a couple ways you could accomplish without any new features exposed by ScalaSTM.First, though, I'm interested in your use case.
ScalaSTM uses information from the previous transaction attempts to make sure performance is good even under high contention, and to speculatively flatten nested transactions (big perf win). It's possible that there's a solution to what you need within the automatic retry framework. If not, we should expose an API that doesn't disable contention management or subsumption (and that is faster and simpler than either of the following hacks, also):
1. Store the failure in a flag, let the transaction retry, do nothing during the second attempt, and then check the flag after atomic() has finished. Something like
2. Use an exception like you're doing, but change the postDecisionFailureHandler of the TxnExecutor you're using to one that rethrows the exception. This is a callback that gets the chance to decide what to do if one of the other callbacks fails after the transaction's outcome is already certain. At the moment the (undocumented) default prints the exception and then keeps going. Sorry about the poor documentation. (Also, if you look at the code you will see TxnExecutor.DefaultPostDecisionExceptionHandler, which despite its name isn't the actual default. Embarrassing.) The actual default is in CCSTMExecutor.scala.