I'm using the work pulling pattern and wanted to add persistence to the Master's work Queue state via akka persistence. I've come up with an approach and wanted to get feedback.
I'm using a muttable.Queue to store the current work and so I need to persist Enqueue and Dequeue events. The Enqueue event is straightforward, just add the work to the queue. However the Dequeue event feels a bit strange in my approach since I need to return the value of calling dequeue. To get around the fact that the function passed to persist has to return Unit I'm storing the the return value of dequeue in a var. Here is a code snippet:
class class Master[T] extends PersistentActor {
val persistenceId = "workpoolMaster"
private val workers = Map.empty[ActorRef, Option[(ActorRef, T)]]
private val queue = Queue.empty[T]
private var work: Option[T] = None
def enqueue(enqueue: Enqueue[T]) = queue += enqueue.work
def dequeue(ignore: Dequeue) = {
if (!queue.isEmpty) work = Some(queue.dequeue)
else work = None
}
val receiveRecover: Receive = {
case work: Enqueue[T] => enqueue(work)
case Dequeue => dequeue(Dequeue)
}
val receiveCommand: Receive = {
case work: Work[T] => {
persist(Enqueue(work.work))(enqueue)
notifyWorkers()
}
case GimmeWork => {
persist(Dequeue)(dequeue)
work foreach { w =>
workers += (sender -> Some(sender -> w))
sender ! Work(w)
}
}
...Does this look like a reasonable approach?
Cheers,
Greg