Question about rollback on concurrent Map.put

49 views
Skip to first unread message

Haddock

unread,
Dec 3, 2014, 3:48:53 PM12/3/14
to scala-stm-e...@googlegroups.com
Hello,

I have a question concerning concurrent put on a Map. In the code below I have constructed a case with the use of latches that provokes the atomic block in runnable 1 to loose the race against the atomic block in runnable 2:

@Test
    public void concurrenPutPlainSTM()
    {
        String key = "1";
        Integer value1 = 1;
        Integer value2 = 2;
        CountDownLatch latch1 = new CountDownLatch(1);
        CountDownLatch latch2 = new CountDownLatch(1);
        CountDownLatch waitTillAllDone = new CountDownLatch(2);

        Map<String, Integer> map = scala.concurrent.stm.japi.STM.newMap();

        Runnable runnable1 = () -> {
            try {
                scala.concurrent.stm.japi.STM.atomic(() ->
                {
                    scala.concurrent.stm.japi.STM.afterRollback(() -> {
                        System.out.println("1 rollback");
                        throw new RuntimeException();
                    });
                    try {
                        latch1.await();
                        map.put(key, value1);
                        latch2.countDown(); // release the latch that blocks the 2nd put
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException in 1");

                    }
                });
            } catch (Throwable e) {
                System.out.println("1 exc");
            }
            waitTillAllDone.countDown();
        };

        Runnable runnable2 = () -> {
            try {
                scala.concurrent.stm.japi.STM.atomic(() ->
                {
                    scala.concurrent.stm.japi.STM.afterRollback(() -> {
                        System.out.println("2 rollback");
                        throw new RuntimeException();
                    });
                    try {
                        latch1.countDown();
                        latch2.await();
                        map.put(key, value2);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException in 2");
                    }
                });
            } catch (Throwable e) {
                System.out.println("2 exc");
            }
            waitTillAllDone.countDown();
        };

        new Thread(runnable1).start();
        new Thread(runnable2).start();

        try {
            waitTillAllDone.await();
        } catch (InterruptedException e) {
            System.out.println("InterruptedException in waitTillAllDone");
        }

        atomic(() -> System.out.println("value: " + map.get(key)));

The code above prints this to the console, which confuses me a bit:

2 rollback
java.lang.Exception: status=RolledBack(OptimisticFailureCause('stale_read,Some(scala.concurrent.stm.ccstm.CCSTMRefs$GenericRef@15bc885e)))
    at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:12)
    at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:11)
    ...
value: Some(2)

Want I want to achieve is that it print this to the console:

1 rollback
1 exc
value: Some(2)

This way runnable 1 has a way to find out that it lost the race with some other transaction and needs to re-read values and retry the transaction. Basically, my question is how to achive this. Not that I would just like other to people to do my work. I really tried for hours and couldn't find the problem.

Thanks for any hints.
Regards, H.



 

Aleksandar Prokopec

unread,
Dec 3, 2014, 4:01:31 PM12/3/14
to scala-stm-e...@googlegroups.com
Before drilling down into this problem, a question: did you consider using the retry functionality of ScalaSTM, instead of using countdown latches, which are an external synchronization primitive?

http://nbronson.github.io/scala-stm/modular_blocking.html

--

---
You received this message because you are subscribed to the Google Groups "Scala STM Discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-stm-expert-...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

daniel kroeni

unread,
Dec 4, 2014, 3:44:07 AM12/4/14
to scala-stm-e...@googlegroups.com
Hi

I assume that you are just mixing latches with STM in order to enforce some interesting execution traces. Otherwise I would follow Alexandar's advice and use modular blocking.

When I run your example I get:

1 rollback
java.lang.Exception: status=RolledBack(OptimisticFailureCause('steal_by_higher_priority,Some(scala.concurrent.stm.ccstm.CCSTMRefs$GenericRef@54b55e49)))
at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:12)
at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:11)
        ...
value: 1  // No Option in your example since your code is using java.util.Map

:)

 In the code below I have constructed a case with the use of latches that provokes the atomic block in runnable 1 to loose the race against the atomic block in runnable 2:

This is not guaranteed by your code:

runnable1 runs to latch1.await() and is blocked
runnable2 runs to latch1.countDown() which enables runnable1 to finish its transaction

To be sure that runnable1 looses the race you have to ensure that runnable2 leaves its atomic block (commits) before runnable1 leaves its atomic block. Let runnable1 wait on a latch which is released by runnable2 after commit (see below).

But the main issue is, that throwing an exception within afterRollback does not abort the transaction. The exception is caught, printed [1] and then the transaction is retried. Check the additional System.out.println("Running transaction 1"); in runnable1. It runs twice when the test is executed:

  @Test
   public void concurrenPutPlainSTMFixed() {
      String key = "1";
      Integer value1 = 1;
      Integer value2 = 2;
      CountDownLatch latch1 = new CountDownLatch(1);
      CountDownLatch latch2 = new CountDownLatch(1);
      CountDownLatch looser = new CountDownLatch(1);
      CountDownLatch waitTillAllDone = new CountDownLatch(2);

      Map<String, Integer> map = newMap();

      Runnable runnable1 = () -> {
         try {
            atomic(() -> {
               afterRollback(() -> {
                  System.out.println("1 rollback");
                  throw new RuntimeException();
               });

               System.out.println("Running transaction 1"); // runs twice

               try {
                  latch1.await();
                  map.put(key, value1);
                  latch2.countDown(); // release the latch that blocks the 2nd put
                  looser.await();
               } catch (InterruptedException e) {
                  System.out.println("InterruptedException in 1");
               }
            });
         } catch (Exception e) {
            System.out.println("1 exc");
         }
         waitTillAllDone.countDown();
      };

      Runnable runnable2 = () -> {
         try {
            atomic(() -> {
               afterRollback(() -> {
                  System.out.println("2 rollback");
                  throw new RuntimeException();
               });
               try {
                  latch1.countDown();
                  latch2.await();
                  map.put(key, value2);
               } catch (InterruptedException e) {
                  System.out.println("InterruptedException in 2");
               }
            });
            looser.countDown();
         } catch (Throwable e) {
            System.out.println("2 exc");
         }
         waitTillAllDone.countDown();
      };

      new Thread(runnable1, "runnable1").start();
      new Thread(runnable2, "runnable2").start();

      try {
         waitTillAllDone.await();
      } catch (InterruptedException e) {
         System.out.println("InterruptedException in waitTillAllDone");
      }

      atomic(() -> System.out.println("value: " + map.get(key)));
   }

I skipped through the documentation but did not find anything describing this behavior.

Cheers
Daniel


Nathan Bronson

unread,
Dec 4, 2014, 1:35:12 PM12/4/14
to scala-stm-e...@googlegroups.com
H, it sounds to me like you are trying to disable the automatic retry feature of the STM. There's a couple ways you could accomplish without any new features exposed by ScalaSTM.

First, though, I'm interested in your use case. ScalaSTM uses information from the previous transaction attempts to make sure performance is good even under high contention, and to speculatively flatten nested transactions (big perf win). It's possible that there's a solution to what you need within the automatic retry framework. If not, we should expose an API that doesn't disable contention management or subsumption (and that is faster and simpler than either of the following hacks, also):

1. Store the failure in a flag, let the transaction retry, do nothing during the second attempt, and then check the flag after atomic() has finished. Something like

  final boolean manualRetryRequired = new boolean[1];
  atomic(() -> {
    if (!manualRetryRequired[0]) {
      afterRollback(() -> {
        System.out.println("2 rollback");
        manualRetryRequired[0] = true;
      });
      try {
        latch1.countDown();
        latch2.await();
        map.put(key, value2);
      } catch (InterruptedException e) {
        System.out.println("InterruptedException in 2");
      }
    }
  });
  if (manualRetryRequired[0]) {
    System.out.println("2 exc");
  }

2. Use an exception like you're doing, but change the postDecisionFailureHandler of the TxnExecutor you're using to one that rethrows the exception. This is a callback that gets the chance to decide what to do if one of the other callbacks fails after the transaction's outcome is already certain. At the moment the (undocumented) default prints the exception and then keeps going. Sorry about the poor documentation. (Also, if you look at the code you will see TxnExecutor.DefaultPostDecisionExceptionHandler, which despite its name isn't the actual default. Embarrassing.) The actual default is in CCSTMExecutor.scala.

The easiest way is probably to override the postDecisionFailureHandler of the default TxnExecutor sometime during program startup, at which point it will be used from then on:
 
  TxnExecutor.transformDefault { e =>
    e withPostDecisionFailureHandler { (status: Txn.status, x: Throwable) =>
      throw x
    }
  }

I don't know which will be faster.

Regards,
  Nathan

--

---
You received this message because you are subscribed to the Google Groups "Scala STM Discussion" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scala-stm-expert-...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Nathan Grasso Bronson
ngbr...@gmail.com

Haddock

unread,
Dec 5, 2014, 12:17:11 PM12/5/14
to scala-stm-e...@googlegroups.com
Hi Daniel,

thanks for your reply. My comments below.


Am Donnerstag, 4. Dezember 2014 09:44:07 UTC+1 schrieb daniel kroeni:
Hi

I assume that you are just mixing latches with STM in order to enforce some interesting execution traces. Otherwise I would follow Alexandar's advice and use modular blocking.

When I run your example I get:

1 rollback
java.lang.Exception: status=RolledBack(OptimisticFailureCause('steal_by_higher_priority,Some(scala.concurrent.stm.ccstm.CCSTMRefs$GenericRef@54b55e49)))
at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:12)
at scala.concurrent.stm.ccstm.CCSTMExecutor$$anonfun$2.apply(CCSTMExecutor.scala:11)
        ...
value: 1  // No Option in your example since your code is using java.util.Map

:)

This is interesting. I get something different (no matter whether using IDEA or eclipse and no matter how often I re-run the test case):

java.lang.Exception: status=RolledBack(OptimisticFailureCause('stale_read,Some(scala.concurrent.stm.ccstm.CCSTMRefs$GenericRef@f5c44e25)   
2 rollback
value: 2

I copied the code I pasted myself in this thread to make sure I run the same code. Looks like when both threads are still in the atomic block who wins is undefined and may depend on machine parameters (performance, etc.).
 

 In the code below I have constructed a case with the use of latches that provokes the atomic block in runnable 1 to loose the race against the atomic block in runnable 2:

This is not guaranteed by your code:

runnable1 runs to latch1.await() and is blocked
runnable2 runs to latch1.countDown() which enables runnable1 to finish its transaction

To be sure that runnable1 looses the race you have to ensure that runnable2 leaves its atomic block (commits) before runnable1 leaves its atomic block. Let runnable1 wait on a latch which is released by runnable2 after commit (see below).

But the main issue is, that throwing an exception within afterRollback does not abort the transaction. The exception is caught, printed [1] and then the transaction is retried. Check the additional System.out.println("Running transaction 1"); in runnable1. It runs twice when the test is executed:

I understand what you mean. When I run your concurrenPutPlainSTMFixed method the thread running it locks up. But it works when stepping through it using the debugger. Concurrent programming is sometimes not easy to grasp ... I also get this:

Running transaction 1
1 rollback
java.lang.Exception: status=RolledBack(OptimisticFailureCause('steal_by_higher_priority,Some(scala.concurrent.stm.ccstm.CCSTMRefs$GenericRef@e1d02e56)))
Running transaction 1
value: 1

Problem is that runnable2 apparently has no way to figure out that it got rolled back as the output contains no "2 rollback" or "2 exc". This is what I'm looking for. Oh my ... ;-).

Regards, H.


Haddock

unread,
Dec 5, 2014, 1:19:41 PM12/5/14
to scala-stm-e...@googlegroups.com
Hi Nathan,

thanks for looking into this. PLease find my comments below.


Am Donnerstag, 4. Dezember 2014 19:35:12 UTC+1 schrieb Nathan Bronson:
H, it sounds to me like you are trying to disable the automatic retry feature of the STM. There's a couple ways you could accomplish without any new features exposed by ScalaSTM.

First, though, I'm interested in your use case.

I'm working on a listenable transactional map. It's just a fun & leisure project with ScalaSTM. I have a class ListenableAtomicMap with a map-style API which has a scala.concurrent.stm.japi.STM.newMap-based map to store the map's associations. Then there are 2 additional scala.concurrent.stm.japi.STM.newMap-based maps to define listeners (one map for put listeners, and one for remove listeners). Here is some sample code to see what I mean:

boolean listenerCalled[] = new boolean[] { false }; 
ListenableAtomicMap<String, Integer> map = new ListenableAtomicMap<>();

atomic(() ->
{            
    map.addListener("1", (PutEvent<Integer> event) -> {
        listenerCalled[0] = true;
    });

    map.put("1", 1);
    Assert.assertTrue(listenerCalled[0]);            
});

Now, as in the code snippet in my starting post I have aListenableAtomicMap that is accessed concurrently "at the same time" from within 2 atomic blocks. With "at the same time" I mean a situation where each thread (aka runnable1 and runnable2) each is inside an atomic block as constructed in my initial post. This results in a situation where one atomic block looses the race and is hence rolled back. My idea is now to let the user of the atomic block which was rolled back know that it got rolled back by throwing an exception. Without throwing the exception the user would assume that the value put to the map has succeded, but this is not the case.
 
ScalaSTM uses information from the previous transaction attempts to make sure performance is good even under high contention, and to speculatively flatten nested transactions (big perf win). It's possible that there's a solution to what you need within the automatic retry framework. If not, we should expose an API that doesn't disable contention management or subsumption (and that is faster and simpler than either of the following hacks, also):

1. Store the failure in a flag, let the transaction retry, do nothing during the second attempt, and then check the flag after atomic() has finished. Something like

2. Use an exception like you're doing, but change the postDecisionFailureHandler of the TxnExecutor you're using to one that rethrows the exception. This is a callback that gets the chance to decide what to do if one of the other callbacks fails after the transaction's outcome is already certain. At the moment the (undocumented) default prints the exception and then keeps going. Sorry about the poor documentation. (Also, if you look at the code you will see TxnExecutor.DefaultPostDecisionExceptionHandler, which despite its name isn't the actual default. Embarrassing.) The actual default is in CCSTMExecutor.scala.

All right, I will give this a try. Thanks for your input :-).

Cheers, Haddock
Reply all
Reply to author
Forward
0 new messages