Hi Felipe,
I had not yet made a new release with the fixes. I usually like to get confirmation that the fixes have helped resolve the problem before distributing them in a release. If this is convenient for you, you can get the latest fixes like this from `$ git clone
https://github.com/kquick/Thespian localdir` (where localdir is a local working directory) and then making sure that is at the beginning of your path `$ export PYTHONPATH=localdir:$PYTHONPATH`. If this isn't convenient, I'm happy to generate a new release with the fixes, because they are passing all of the local unit tests, including the tests I added based on your discoveries.
If you are waiting on a hardware element (a drive in this case that might need to wait for a CD insertion, local mounting, or some other slow, physical process, I would recommend intentional delays using the `self.wakeupAfter()` mechanism. Something like the following:
from thespian.actors import *
import datetime
class StartOp(object):
"This is the message with the operation details"
pass
class FinishedOp(object):
"This is a message sent back when the operation is finished"
def __init__(self, requesting_message):
self.reqmsg = requesting_message
class Supervisor(ActorTypeDispatcher):
'''This is the supervisor that makes sure there is a Worker to handle
any StartOp messages, starting one as needed.
'''
def __init__(self, *args, **kw):
self.worker = None
super(Supervisor, self).__init__(*args, **kw)
def receiveMsg_StartOp(self, startmsg, sender):
if not self.worker:
self.worker = self.createActor(Worker)
self.send(self.worker, (startmsg, sender))
def receiveMsg_ChildActorExited(self, msg, sender):
self.worker = None
class Worker(ActorTypeDispatcher):
def __init__(self, *args, **kw):
self.waiting = []
super(Worker, self).__init__(*args, **kw)
def receiveMsg_tuple(self, tuplemsg, sender):
"Received an operation message from the Supervisor"
msg, orig_sender = tuplemsg
# Here is a convenient trick: Python classes are open so we can
# easily add a member to save additional data for later
msg.worker_orig_sender = orig_sender
# Now create an actor to do the actual HW operation, which may fail
# if the HW is not ready: failure will cause us to get back a PoisonMessage
hw_oper = self.createActor(Drive_Operator)
self.send(hw_oper, msg)
def receiveMsg_FinishedOp(self, finishmsg, sender):
# drive operation completed successfully, notify original requester
self.send(finishmsg.reqmsg.worker_orig_sender, finishmsg)
def receiveMsg_PoisonMessage(self, poisonmsg, sender):
# The operator actor failed, probably because the hardware
# wasn't ready. Save this message internally and retry it after
# waiting a little while for the HW to be ready.
self.waiting.append(poisonmsg.poisonMessage)
self.wakeupAfter(datetime.timedelta(seconds=3))
# In this case, Drive_Operator didn't fully run and kill
# itself, so it should be manually removed here
self.send(sender, ActorExitRequest())
def receiveMsg_WakeupMessage(self, wakemsg, sender):
pending = self.waiting[]
self.waiting = []
for each in pending:
self.send(self.createActor(Drive_Operator), each)
class Drive_Operator(ActorTypeDispatcher):
def receiveMsg_StartOp(self, startmsg, sender):
# Expect do_drive_operation_with to throw an Exception if the drive
# is not ready. This will auto-retry once, but if the drive is still not ready,
# then 'sender' will get back a PoisonMessage.
do_drive_operation_with(startmsg)
self.send(sender, FinishedOp(startmsg))
# This actor does not hang around, it does the operation and then exits.
self.send(self.myAddress, ActorExitRequest())
This is a pretty skeletal example and should have a lot of additional things added to it, including counting the number of retries and giving up after 10, but it should help to give you some ideas about how you can use the `self.wakeupAfter()` to delay your retry operations.
Regards,
Kevin