Save Publisher-Subscriber Pattern with STM

27 views
Skip to first unread message

Sciss

unread,
Jul 2, 2010, 12:38:51 PM7/2/10
to scala...@googlegroups.com
hi nathan,

(this is me brainstorming, so i'd appreciate your opinion, but if you have little time, just skip the mail :-)

i am wrapping my head around the question of how to correctly implement the publisher/subscriber pattern inside the STM. basically i have container objects and i need to track object creation and deletion and be able to safely add observers to newly created children, safely meaning that i can query their current state and simultaneously register the observer such that i won't miss any notification inbetween or receive a notification about objects that it already initially queried.

i have come up with this kind of approach. sorry that this quite verbose.... but should be easy to grasp.

import collection.immutable.{ Queue => IQueue }
import edu.stanford.ppl.ccstm.{ Ref, STM, Txn, TxnLocal }

sealed abstract class Action
case object Added extends Action
case object Removed extends Action

trait Subscriber[ -T ] {
def changed( changes: (Action, T)* ) : Unit
}

trait Container[ T ] {
val children = Ref( Set.empty[ T ])
protected val mod = new TxnLocal[ IQueue[ (Action, T )]] { override def initialValue( tx: Txn ) = IQueue.empty }
def add( child: T )( implicit txn: Txn ) : Unit
def remove( child: T )( implicit txn: Txn ) : Unit
}

trait Publisher[ T ] {
protected val subscribers = Ref( Set.empty[ Subscriber[ T ]])
def addSubscriber( s: Subscriber[ T ])( implicit tx: Txn ) : Unit
def removeSubscriber( s: Subscriber[ T ])( implicit tx: Txn ) : Unit
}

trait ContainerPublisher[ T ] extends Container[ T ] with Publisher[ T ] {
private val touched = new TxnLocal[ Boolean ] { override def initialValue( tx: Txn ) = false }
private val subscriptionTime = new TxnLocal[ Map[ Subscriber[ T ], Int ]] { override def initialValue( tx: Txn ) = Map.empty }

def add( child: T )( implicit txn: Txn ) {
touch
children.transform( _ + child )
// mod.transform( _ enqueue (Added, child) )
mod.set( mod.get enqueue (Added, child) )
}

def remove( child: T )( implicit txn: Txn ) {
touch
children.transform( _ - child )
// mod.transform( _ enqueue (Removed, child) )
mod.set( mod.get enqueue (Removed, child) )
}

def addSubscriber( s: Subscriber[ T ])( implicit tx: Txn ) {
subscribers.transform( _ + s )
// subscriptionTime.transform( _ + (s -> mod().size) )
subscriptionTime.set( subscriptionTime.get + (s -> mod.get.size) )
}

def removeSubscriber( s: Subscriber[ T ])( implicit tx: Txn ) {
subscribers.transform( _ - s )
// subscriptionTime.transform( ??? )
}

private def touch( implicit tx: Txn ) {
val wasTouched = touched.get
if( !wasTouched ) {
touched.set( true )
tx.beforeCommit( tx0 => {
val modRes = mod.get( tx0 )
val timeRes = subscriptionTime.get( tx0 )
val subscrRes = subscribers.get( tx0 )
tx0.afterCommit { _ =>
subscrRes.foreach( sub => {
val from = timeRes.getOrElse( sub, 0 )
val unseen = modRes.drop( from )
sub.changed( unseen: _* )
})
}
}, Int.MaxValue )
}
}
}

case class Connection( name: String )
case class Process( name: String ) extends ContainerPublisher[ Connection ]
object World extends ContainerPublisher[ Process ]

abstract class ContainerObserver[ C, P <: ContainerPublisher[ C ]]( parent: P ) extends Subscriber[ C ] {
def changed( changes: (Action, C)* ) {
changes.foreach( println( _ ))
}

def start {
STM.atomic { implicit t =>
val current = parent.children().toSeq
parent.addSubscriber( this )
changed( current.map( obj => Added -> obj ): _* )
}
}

def stop { STM.atomic { implicit t => parent.removeSubscriber( this )}}
}

class ProcessObserver( parent: Process ) extends Subscriber[ Connection ] {
me =>

def changed( changes: (Action, Connection)* ) {
changes.foreach( println( _ ))
}

def start {
STM.atomic { implicit t =>
val current = parent.children().toSeq
parent.addSubscriber( me )
changed( current.map( obj => Added -> obj ): _* )
}
}

def stop { STM.atomic { implicit t => parent.removeSubscriber( me )}}
}

object WorldObserver extends Subscriber[ Process ] {
me =>

private var obs = Map.empty[ Process, ProcessObserver ]

def changed( changes: (Action, Process)* ) {
changes.foreach( tup => {
println( tup )
tup match {
case (Added, p) => defer {
val ob = new ProcessObserver( p )
obs += p -> ob
ob.start
}
case (Removed, p) => defer {
obs.get( p ).foreach( ob => {
obs -= p
ob.stop
})
}
}
})
}

// the observer is doing some GUI work, so we defer all changes (hence obs becomes thread-safe)
private def defer( thunk: => Unit ) =
java.awt.EventQueue.invokeLater( new Runnable {
def run = thunk
})

def start {
STM.atomic { implicit t =>
val current = World.children().toSeq
World.addSubscriber( me )
changed( current.map( obj => Added -> obj ): _* )
}
}

def stop { STM.atomic { implicit t => World.removeSubscriber( me )}}
}

WorldObserver.start

// seems to work:
val p1 = Process( "p1" )
STM.atomic { implicit t => World.add( p1 )}
STM.atomic { implicit t => World.remove( p1 )}
STM.atomic { implicit t => p1.add( Connection( "c1" )); p1.add( Connection( "c2" ))}
STM.atomic { implicit t => World.add( p1 )}
STM.atomic { implicit t => World.remove( p1 )}

STM.atomic { implicit t =>
p1.add( Connection( "c1" ))
World.add( p1 )
p1.add( Connection( "c2" ))
}

this question is if i'm doing write with the tracking of subscription changes during a transaction? maybe i am doing something completely stupid and there isn't any conflict to look after, and the whole thing is in fact much easier to achieve?

thanks for insights!

best, -sciss-

Sciss

unread,
Jul 2, 2010, 12:45:39 PM7/2/10
to scala...@googlegroups.com
for example i wonder if with

STM.atomic { implicit t =>
publisher.addSubscriber( s )
// XXX
val currentState = publisher.children()
}

someone else could call publisher.add( Z ) inbetween (at position XXX) which would result in the subscriber getting a notification about added Z albeit having seen Z already in currentState?

i have the feeling i should read more about the logic of STMs.... anyone recommend any article in particular, or book (chapter)?

thanks, -sciss-

Sciss

unread,
Jul 2, 2010, 2:16:20 PM7/2/10
to scala...@googlegroups.com
here is another try which looks less tricky: the idea is to bundle modifications in add and remove sets, as the order of actions within the txn is not relevant. instead of receiving an update per operation, subscribers just get one update-"sheet". subscribers added during a transaction are treated differently in that they don't participate in the "partial" update-sheet of that operation, but instead get delivered separatedly a "full" sheet containing all objects in the container. that way they don't need to (=must not) call objects() at all, but they just wait for the automatic full update.

best, -sciss-


import edu.stanford.ppl.ccstm._

trait Obj

case class Update( added: Set[ Obj ], removed: Set[ Obj ])

trait Subscriber {
def update( u: Update )
}

class Publisher {
private val update = new TxnLocal[ Update ] {
override def initialValue( t: Txn ) = Update( Set.empty, Set.empty )


}
private val touched = new TxnLocal[ Boolean ] {

override def initialValue( t: Txn ) = false
}
private val newSubs = new TxnLocal[ Set[ Subscriber ]] {
override def initialValue( t: Txn ) = Set.empty
}
private val subs = Ref( Set.empty[ Subscriber ])

val objects = Ref( Set.empty[ Obj ])

def add( obj: Obj )( implicit t: Txn ) {
touch
val objs = objects()
if( objs.contains( obj )) return
objects.set( objs + obj )
val upd = update.get
update.set( if( upd.removed.contains( obj )) {
upd.copy( removed = upd.removed - obj )
} else {
upd.copy( added = upd.added + obj )
})
}

def remove( obj: Obj )( implicit t: Txn ) {
touch
val objs = objects()
if( !objs.contains( obj )) return
objects.set( objs - obj )
val upd = update.get
update.set( upd.copy( upd.added - obj, upd.removed + obj ))
update.set( if( upd.added.contains( obj )) {
upd.copy( added = upd.added - obj )
} else {
upd.copy( removed = upd.removed + obj )
})
}

def addSubscriber( s: Subscriber )( implicit t: Txn ) {
val old = newSubs.get
newSubs.set( old + s )
if( old.isEmpty ) { // e.g. touched
t.beforeCommit( tx0 => {
val these = newSubs.get( tx0 )
subs.transform( _ ++ these )( tx0 )
val upd = Update( objects.get( tx0 ), Set.empty ) // full update
tx0.afterCommit { tx1 =>
these.foreach( _.update( upd ))
}
}, Int.MaxValue ) // e.g. after partial dispatch preparation
}
}

private def touch( implicit t: Txn ) {
val old = touched.get
if( !old ) {
touched.set( true )
t.beforeCommit( tx0 => { // dispatch preparation
val these = subs.get( tx0 )
val upd = update.get( tx0 ) // partial update
tx0.afterCommit { tx1 =>
these.foreach( _.update( upd ))
}
}, Int.MaxValue - 1 )
}
}
}

val p = new Publisher

case class Sub( name: String ) extends Subscriber {
def update( u: Update ) {
println( "----- " + name )
println( "ADDED:" )
u.added.foreach( o => println( " " + o ))
println( "REMOVED:" )
u.removed.foreach( o => println( " " + o ))
}
}

val s1 = Sub( "s1" )
val s2 = Sub( "s2" )
val s3 = Sub( "s3" )

case class Ob( name: String ) extends Obj

STM.atomic { implicit t =>
p.add( Ob( "1" ))
p.addSubscriber( s1 )
p.add( Ob( "2" ))
p.add( Ob( "3" ))
p.addSubscriber( s2 )
}

STM.atomic { implicit t =>
p.add( Ob( "4" ))
p.add( Ob( "5" ))
p.remove( Ob( "3" ))
p.remove( Ob( "4" ))
}

STM.atomic { implicit t =>
p.add( Ob( "4" ))
p.add( Ob( "5" ))
p.addSubscriber( s3 )
p.remove( Ob( "3" ))
p.remove( Ob( "4" ))
}

p.objects.single()

Sciss

unread,
Jul 2, 2010, 2:19:26 PM7/2/10
to scala...@googlegroups.com
( of course "touch" should go after the check

def add( obj: Obj )( implicit t: Txn ) {

val objs = objects()
if( objs.contains( obj )) return

touch


objects.set( objs + obj )
val upd = update.get
update.set( if( upd.removed.contains( obj )) {
upd.copy( removed = upd.removed - obj )
} else {
upd.copy( added = upd.added + obj )
})
}

same for remove )

Nathan Bronson

unread,
Jul 6, 2010, 7:34:38 PM7/6/10
to scala...@googlegroups.com
Sorry for the delayed response.

Daniel Spiewak's blog discusses building a simple STM from scratch, it's
a good place to start. CCSTM's syntax is descended from his STM

http://www.codecommit.com/blog/scala/software-transactional-memory-in-scala

There's good info in the Wikipedia article, although it uses a lot of
jargon

http://en.wikipedia.org/wiki/Software_transactional_memory

The Wikipedia article on optimistic concurrency control might actually
be more helpful, because virtually all STMs use some variant of OCC in
their algorithm:

http://en.wikipedia.org/wiki/Optimistic_concurrency_control


To answer your question, the STM will guarantee that if another
transaction (call it B) calls publisher.add at XXX, that either this txn
or txn B will be rolled back, or that B will be blocked until after this
txn has completed. The STM tries to prevent inconsistent states as it
goes forward. Whenever it discovers a conflict between two
transactions, it picks one of them to roll back and retry.

- Nathan

Nathan Bronson

unread,
Jul 6, 2010, 8:46:58 PM7/6/10
to scala...@googlegroups.com
Sciss,

I've gotten a chance to look over your pub-sub stuff, and it looks
pretty reasonable. I've simplified your last version and checked it
into CCSTM's github at

src/test/scala/edu/stanford/ppl/ccstm/examples/PubSub.scala

Instead of keeping multiple TxnLocal-s, I just created a Changes
instance that tracks all of the pending changes. I also simplified it
slightly by registering the transaction callback from inside that
TxnLocal's initialValue method, since it is guaranteed to be called
exactly once per Txn.

I should probably draw a finite-state diagram, but is it clear that
transactions that get to the beforeCommit stage might not commit? If
the actions taken by the subscriber are themselves using the STM, then
this is not a problem (because everything can be rolled back and
retried). If the subscribers are doing other things then this might not
be okay.

How many subscribers do you expect in your system? What will they be
doing?

Cheers,
- Nathan

Sciss

unread,
Jul 7, 2010, 5:25:02 AM7/7/10
to scala...@googlegroups.com
hey nathan,

thanks for looking into this and even working it out! i will take a look soon. at the moment i have a fews days to get a working system, so i will leave the pub-sub as it (it seems to be robust). it basically worked out with the update / changes pattern and the full-update for newly registered subscribers.

it is clear to me that there can still be stuff happing after beforeCommit, that's why i use a two stage approach ( http://github.com/Sciss/Cupola/blob/master/src/main/scala/de/sciss/synth/proc/TxnModel.scala ):

protected def touch( implicit tx: ProcTxn ) {
if( !touched.swap( true )) {
tx.beforeCommit( tx0 => { // dispatch preparation
val parList = listeners()( tx0 )
val parUpd : T = if( parList.nonEmpty ) updateRef()( tx0 ) else null.asInstanceOf[ T ]
val fullList = newListeners()( tx0 )
val fullUpd = if( fullList.nonEmpty ) fullUpdate( tx0 ) else null.asInstanceOf[ T ]
if( fullList.nonEmpty ) listeners.transform( _ ++ fullList )( tx0 )
if( parUpd != null || fullUpd != null ) tx0.afterCommit { tx1 => // WAIT TO BE SURE THE TXN IS COMMITTED !!
parList.foreach( _.updated( parUpd ))
fullList.foreach( _.updated( fullUpd ))
}
}, Int.MaxValue )
}
}

currently there is just one :-) subscriber which is the GUI layer. so i have a sound synthesis system that can be text controlled via a DSL, and there is a GUI front end that reflects all that's going on. basically a MVC approach, that i have been using a lot in the past, so i know exactly how to handle it.

i will try to publish a binary soon so to showcase the thing, i think it's going to be a really nice tool.

the only question i need to solve in a real live-coding context is how to wrap the code entered in the REPL in a STM.atomic { } block. my current approach is to wrap the code in a class, and afterwards add an import statement, so all the new bindings are available in the next REP loop. but that gets an incremental slowdown after like fifty or so executions. ( http://scala-programming-language.1934581.n4.nabble.com/Variable-bindings-in-the-REPL-and-massaging-text-input-tt2259076.html#a2259076 )

so eventually i am thinking of needing to hack open the STM.atomic call to be able to create the Txn and to close it separately... i don't know yet exactly how to do, but something like:

object REPLSupport {
implicit def giveMyATransaction : Txn = Txn.open
}

repl.execute( "import REPLSupport._" )

def executeOnTheRepl( code: String ) {
code + "; closeTxnIfOneWasOpened"
}

only that i need to figure out how to close the transaction if the execution throws an exception. i guess i can do something with a thread-local variable, since the REPL will execute in the same thread as the caller as far as i understand.


so that's just a forecast, i will present this in more detail when i had time to look into it more carefully.


thanks again for CCSTM (and also to daniel spiewak i guess :-), it really was an important thing in my project to discover.

-sciss-

Sciss

unread,
Jul 7, 2010, 5:34:59 AM7/7/10
to scala...@googlegroups.com
to be more detailed about this question: the GUI maintains its own view of the world as presented to it by the publisher-update-pushes. i am using swing, so it runs happily and independantly on the awt event-thread, and doesn't need to access the STM for viewing, zooming, panning around etc.

if i grab a control and move it, let's say a dial to adjust the frequency of a sound process, it will do a STM.atomic { control.value = newValue } and done. now the nice thing with STM now is that if the action is not valid, for example because the sound process terminated automatically in the meantime, it will (ideally) gracefully fail. the only tricky part i have encountered is if there is a problem after sending out messages to the sound server, you know when the transaction is paused by the actor, because then i would need to undo the sideeffects. in theory this shouldn't happen, but i am seeing sporadically TIMEOUT errors which i will need to investigate.

as a registered listener the GUI will automatically be informed about the new state (if the transaction completes successfully). so it is a clean MVC separation. eventually the GUI could even go running on a different computer (i have some positive experience with this in another project called SwingOSC which i was using for the predecessor of the sound-improvisation-instrument i am developing now in scala).

there could be other independant GUI views in the future so there is a chance that there is more than one subscriber.

best, -sciss-


Am 07.07.2010 um 01:46 schrieb Nathan Bronson:

Sciss

unread,
Jul 7, 2010, 5:57:22 AM7/7/10
to scala...@googlegroups.com
i took a look at your new example:

yes, your example looks pretty much exactly like what i arrived at. again, only the risk that the commit is rolled back is not handled. so i propose something like in my TxnModel class -- capture the updates etc. in beforeCommit, but call the subscribers in the afterCommit.

best, -sciss-

Am 07.07.2010 um 01:46 schrieb Nathan Bronson:

Reply all
Reply to author
Forward
0 new messages