[akka-user] Clojure add-watch and Stm.retry

52 views
Skip to first unread message

Peter Veentjer

unread,
May 19, 2010, 5:35:21 PM5/19/10
to akka-user
Hi Guys,

Clojure provides an add-watch to listen to writes to specific cells:

http://richhickey.github.com/clojure/clojure.core-api.html#clojure.core/add-watch

Multiverse provides similar functionality (part of Multiverse since 0.1) with the retry functionality.

You can create stuff like:

class Stack{
    Ref<Node> head = new Ref<Node>();
    final int maxCapacity = 100;
    IntRef size = new IntRef();

    void push(Object item){
           if(size.get()>=maxCapacity){
                retry();
           }
           head.set(head.get(),item);
           size.inc();
    }

    Object pop(){
        if(head.isNull()){
             retry();
        }

        Node old = head.get();
        head.set(old.tail);
        return old.value;
    }

    class Node{
       final Node tail
       final Object value;
    }
}

A retry in the stm literature is the same as an add-watch to all refs that have been read.

You can even compose blocking operations:

Object pop(Stack.. stacks){
    for(Stack stack: stacks){
          if(!stack.isEmpty()){
               return stack.pop();
          }
    }
   
   retry();
   return null;//never executed.     
}

Or using the orelse template (in scala/groovy the syntax is a lot less horrible).

Object pop(Stack stack1, Stack stack2){
  return new OrElseTemplate(){
       void run(Transaction tx){
            return stack1.pop();
        }
        void orelserun(Transaction tx){
            return stack2.pop();
        }
  }.execute();
}


Blocking behavior is not allowed inside a transactor (this can be configured using the explicitretryallowed functionality on the transactionfactorybuilder).

But if you are not using actors, all this behavior is available.

Multiverse automatically learns (with the scope of a transactionfactory) when reads should be tracked. So in most cases you don't need to configure anything.

It would be nice if this functionality is exposed in the stm functionality of Akka.

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

Jonas Bonér

unread,
May 21, 2010, 3:40:57 AM5/21/10
to akka...@googlegroups.com
This sounds interesting. The best way to get it into the product is to
create an issue and assembla:
https://www.assembla.com/spaces/akka/tickets/new

Then I will make sure that it gets prioritized and scheduled for a release.

I also need to read up on 'retry' and how it can be used. It's still
not clear to me.
--
Jonas Bonér

work: http://jayway.com
code: http://akkasource.com
blog: http://jonasboner.com
twitter: @jboner

Peter Veentjer

unread,
May 21, 2010, 4:16:27 AM5/21/10
to akka...@googlegroups.com
Hi Jonas,

On Fri, May 21, 2010 at 9:40 AM, Jonas Bonér <jo...@jonasboner.com> wrote:
This sounds interesting. The best way to get it into the product is to
create an issue and assembla:
https://www.assembla.com/spaces/akka/tickets/new

I'll add an issue. 
 
Then I will make sure that it gets prioritized and scheduled for a release.

 I also need to read up on 'retry'  and how it can be used. It's still
not clear to me.

The retry is the stm version of the Object.wait/Condition.await. And instead of dealing
with a notify/signal, the transaction commit will do that for you.

On a more technical level:
A retry in Multiverse works like this:

static void retry(){
    throw new RetryError();
}

This error is caught in the transaction template.

If a transaction automatically tracks (so stores them in the transaction) all references
that have been read and a retry is caught in the transaction template, the transaction
template will register a listener (add-watch) to each and every references that has been
read. Essentially each and every ref can become a condition variable on demand. This is
where multiverse differs from Clojure, with Clojure this is a manual process and with
Multiverse it is an automatic process (although I'm also thinking about adding it manually).
Essentially Steve Harris is the guy that come up with all this.

After the listener (a latch to be more specific) is registered, the transaction Thread
is put to sleep and will wake up once a write happens on one of the read references.

Once the thread in the transactiontemplate wakes because there was a write, it will
restart the whole transaction and begins executing the atomic block again. That is how
the blocking mechanism in Multiverse works.

With the retry you can create a blocking structure like a queue or stack. But the cool thing
is that you can make a blocking structure of any structure and are not dependant of the
features provided by that structure. E.g.

class Account{
   LongRef money = new LongRef();

   @TransactionaMethod
  long get(){return money.get();}

   @TransactionaMethod
  long set(long newValue){money.set(newValue;}
}

As you can see, there is no blocking behaviour added to this Account object. But I can use
it as a blocking structure.

E.g.

static void steal(int amount, Account account){
     if(account.get()<amount){
           retry();
     }
     money.set(money.get()-amount);
}

In this example, money is stolen from the account only if it contains a value that is high enough.
If there is not enough money, the transaction blocks until enough money comes available or
when it encounters too many retries.

So the cool thing is that you can make any transactional datastructure part of a blocking
operations

Another cool thing is that because each ref becomes a condition variable, you can also
block on multiple resources (so composable blocking operations) If you have 2 blocking
queues, and you want to wake up when an item comes available on one of those queues,
how would you do that in Java?

A dirty solution would be forking some threads and let them block on each queue. Another dirty
solution would be to expose the lock (perhaps by injecting a lock from the outside that is shared
between the 2 queues but this would introduce unwanted contention).

In Stm it can be done like this (pseudo code)

@TransactionalMethod
Object pop(Queue q1, Queue q2){
     either{
        return q1.pop();
    }orelse{
        return q2.pop();
    }
}

or

@TransactionalMethod
Object pop(Queue q1, Queue q2){
      if(!q1.isEmpty()){
            return q1.pop();
      }

      if(!q2.isEmpty()){
           return q2.pop();
      }

      retry();
      return null;//will never be executed
}

If no item is available, the pop will block.

Multiverse also provides support for interruptible and timed blocking (can all be configured
from the transactionfactorybuilder).

Automatic readtracking can be expensive, so if speculative execution is used (default),
all transactions begin without automatic readtracking. But as soon as a transaction
encounters a retry, it will abort and signal to the transaction factory that the next time
it should better use readtracking and restarts (now with readtracking enabled). So the
system will learn (with the scope of a transactionfactory so each transaction factory
can learn different things).


Peter Veentjer

unread,
May 21, 2010, 12:13:53 PM5/21/10
to akka...@googlegroups.com
Hi Jonas, can you add me to the akka space on assembla?

My accountname is pveentjer

I would also like to hear your thoughts about the previous mail. Blocking transaction
can be useful and are important to understand imho.

Peter Veentjer

unread,
May 26, 2010, 5:11:50 PM5/26/10
to akka...@googlegroups.com
*Kick*

Did you have time to look at the blocking stuff?

And when are you expecting to release the next Akka version? It would be nice if we can integratie Multiverse 0.6:

- better scalable clock
- transactional treeset/treemap.. and we can also expose the transactionallinkedlist (that can be used as blockingqueue and deque), transactionalreference array and transactionalarraylist
- improved transaction template that makes storing it locally easier (no need to subclass, but just pass a transactional callable)
- more performance improvements in the bytecode
- more performance/scalability improvements in the stm itself (less pressure on the memory bus for example)
- sensors so you have some feedback what is happening inside the stm.
- logging on transaction level (for debugging)
- scala compatible bytecode instrumentation so Akka can also be used with object granularity instead of only field granularity (also a good performance boost if you have more complex datastructures)

Jonas Bonér

unread,
Jun 10, 2010, 5:59:03 AM6/10/10
to akka...@googlegroups.com
On 26 May 2010 23:11, Peter Veentjer <alarm...@gmail.com> wrote:
> *Kick*
>
> Did you have time to look at the blocking stuff?
>

I would love to get the blocking stuff in Akka. It looks very useful.
This ticket is scheduled for 0.10.
https://www.assembla.com/spaces/akka/tickets/232-blocking-transactions-
I hope to get it in.
Perhaps Peter Vlugter his planning on doing it as part of the STM refactoring.

> And when are you expecting to release the next Akka version? It would be
> nice if we can integratie Multiverse 0.6:
>
> - better scalable clock

This is the highest priority in my opinion.

Peter Veentjer

unread,
Jun 10, 2010, 4:04:34 PM6/10/10
to akka...@googlegroups.com
Hi Jonas,

getting it in is no problem (I had to do special work disabling it for Akka).

The 3 things that need to be done are:
1) expose the retry method (org.multiverse.api.StmUtils.retry()
2) make sure that the transactionfactory for non actor transactions is configured with setExplicitRetryAllowed(true).
3) expose the OrElseTemplate. One of my committers currently is working on it to let it work with a TransactionalCallable instead of relying on subclassing.

So something that can be done in an hour. All the references will keep working like expected (blocking behavior is available for all of them).

What is important to realize however, is that blocking doesn't rely on spinning. Under water a normal waitset is used.

Jonas Bonér

unread,
Jun 10, 2010, 4:07:52 PM6/10/10
to akka...@googlegroups.com
On 10 June 2010 22:04, Peter Veentjer <alarm...@gmail.com> wrote:
> Hi Jonas,
>
> getting it in is no problem (I had to do special work disabling it for
> Akka).
>
> The 3 things that need to be done are:
> 1) expose the retry method (org.multiverse.api.StmUtils.retry()

One minute.

> 2) make sure that the transactionfactory for non actor transactions is
> configured with setExplicitRetryAllowed(true).

Ok. Also simple.

> 3) expose the OrElseTemplate. One of my committers currently is working on

That is done already:

atomically {
..
} orElse {
..

Peter Veentjer

unread,
Jun 10, 2010, 4:17:59 PM6/10/10
to akka...@googlegroups.com
It isn't complicated :)

And the cool thing is that you can even wait on multiple transactional references:

void transfer(Account account1, Account account2, int amount){
   atomic{
       if(account1.get()+account2.get()<amount){
           retry();
       }

       account1.dec(amount);
   }
}

Imho very cool :)

What also can be added is a timeout and being interruptible (all configurable from the transactionfactory).

so something like:

void transfer(Account account1, Account account2, int amount){
   atomic(interruptible=true, timeout = 10, timeoutUnit = TimeUnit.SECONDS){
       if(account1.get()+account2.get()<amount){
           retry();
       }

       account1.dec(amount);
   }
}

In most cases this behavior is added on the top level of the transaction and often you  create multiple signatures.. but the logic they call is all the same. This prevents the need to copy the same methods more than once for different needs.

so something like:

@TransactionalMethod
public void transfer(Account account1, Account account2, int amount){
     return transferLogic(account1, account2, amount);
}

@TransactionalMethod //in this case the interruptible behavior is inferred based on the signature
public void transferInterruptible(Account account1, Account account2, int amount)throws InterruptedException{
     return transferLogic(account1, account2, amount);
}

//throws TimeoutException when timeout happens.
@TransactionalMethod(timeout = 10, timeoutUnit = TimeUnit.SECONDS)
public void tryTransferInterruptible(Account account1, Account account2, int amount){
     return transferLogic(account1, account2, amount);
}

private void transferLogic(Account account1, Account account2, int amount){
       if(account1.get()+account2.get()<amount){
           retry();
       }

       account1.dec(amount);

Peter Veentjer

unread,
Jun 10, 2010, 4:39:50 PM6/10/10
to akka...@googlegroups.com
Last comment:

For the retry to work, reads need to be tracked. If speculative configuration is enabled on the transactionfactorybuilder (which it is by default), a transaction starts without readtracking. But once it hits a retry, it is aborted and restarted but now with automatic readtracking enabled.

So if you ever get the question, you know how this works.

Peter Vlugter

unread,
Jun 10, 2010, 5:21:04 PM6/10/10
to akka...@googlegroups.com
Hi there,

I've already got retry and configurable transaction factory on a private branch. I'm still to test it properly with the ants demo and other things but simple tests all work fine as expected.

I've moved the atomically-orElse to sit with retry under Transaction.Util and I've also renamed it to either-orElse to line up with Multiverse and so that it's usage is not confused with a compensating task.

And I have deferred and compensating methods in there as well (just mapping to the Multiverse StmUtils as well).

The main thing I was going to work on before pushing this branch up was trying to remove the Akka specific transaction class altogether. And there are some other smaller changes too, like trying out package objects. I'm also still to look at exposing the Multiverse transactional objects and refs and looking at extending BasicRef.

Anyway, here's how using retry (and configuring) works at the moment (I just run this in sbt console):

import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.stm.Transaction.Local._
import se.scalablesolutions.akka.stm.Transaction.Util._
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util.Logging

type Account = Ref[Double]

val account1 = Ref(100.0)
val account2 = Ref(100.0)

case class Transfer(from: Account, to: Account, amount: Double)

class Transferer extends Actor with Logging {
implicit val txFactory = TransactionFactory(trackReads = true, explicitRetries = true)
def receive = {
case Transfer(from, to, amount) =>
atomic {
if (from.getOrElse(0) < amount) {
log.info("not enough money - retrying")
retry
}
log.info("transferring")
from alter (_ - amount)
to alter (_ + amount)
}
}
}

val transferer = actorOf(new Transferer).start

transferer ! Transfer(account1, account2, 500.0)
// INF [20100611-09:06:11.478] Transferer: not enough money - retrying

atomic { account1 alter (_ + 2000) }
// INF [20100611-09:06:35.346] Transferer: transferring

atomic { account1.get }
// Option[Double] = Some(1600.0)

atomic { account2.get }
// Option[Double] = Some(600.0)

transferer.stop

Peter Veentjer

unread,
Jun 10, 2010, 5:34:07 PM6/10/10
to akka...@googlegroups.com
What is the meaning of 'getOrElse'?

Peter Vlugter

unread,
Jun 10, 2010, 5:48:11 PM6/10/10
to akka...@googlegroups.com

On 11/06/2010, at 9:34 AM, Peter Veentjer wrote:

> What is the meaning of 'getOrElse'?

It means get the value or otherwise give me a default value. At the moment ref.get returns an option type. So it would return None or Some(100) depending on whether a value is defined. Like the Maybe type in Haskell. It's possible to use ref.get.get to access the value but if the value was undefined then it would throw an exception (just like the underlying ref.get).

Peter Veentjer

unread,
Jun 10, 2010, 5:51:37 PM6/10/10
to akka...@googlegroups.com
aha ok.. I don't know if that is very clear since the orelse construct also is used for blocking operations (part of the stm literature). So it put me on the wrong foot.

Perhaps getOrDefaultTo would be less confusing.

Peter Vlugter

unread,
Jun 10, 2010, 6:01:19 PM6/10/10
to akka...@googlegroups.com
It's following the convention of the Option type which also has getOrElse. Option also has an orElse to give a default option (rather than value). And orElse is also defined on partial functions to provide fallbacks. So it should be okay for Scala users...

Peter Veentjer

unread,
Jun 10, 2010, 6:07:46 PM6/10/10
to akka...@googlegroups.com
Ok.. perhaps we can use a different name for the orelse mechanism then. Either/or e.g.

Jonas Bonér

unread,
Jun 11, 2010, 4:44:50 AM6/11/10
to akka...@googlegroups.com

Looking good. This is cool stuff. :-)

--
Jonas Bonér
http://jayway.com
http://akkasource.com
twitter: jboner

> For the retry to work, r...

--

You received this message because you are subscribed to the Google Groups "Akka User List" group.

To...

Viktor Klang

unread,
Jun 11, 2010, 9:00:39 AM6/11/10
to akka...@googlegroups.com
On Fri, Jun 11, 2010 at 10:44 AM, Jonas Bonér <jo...@jonasboner.com> wrote:

Looking good. This is cool stuff. :-)


Agreed! :)
 
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.



--
Viktor Klang
| "A complex system that works is invariably
| found to have evolved from a simple system
| that worked." - John Gall

Akka - the Actor Kernel: Akkasource.org
Twttr: twitter.com/viktorklang
Reply all
Reply to author
Forward
0 new messages